diff --git a/lib/examples/minimal.rs b/lib/examples/http.rs similarity index 96% rename from lib/examples/minimal.rs rename to lib/examples/http.rs index 96949c7d6..9afcccdd8 100644 --- a/lib/examples/minimal.rs +++ b/lib/examples/http.rs @@ -37,7 +37,7 @@ fn main() -> anyhow::Result<()> { let worker_thread_join_handle = thread::spawn(move || { let max_buffers = 500; let buffer_size = 16384; - sozu_lib::http::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size) + sozu_lib::http::testing::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size) .expect("The worker could not be started, or shut down"); }); diff --git a/lib/examples/main.rs b/lib/examples/https.rs similarity index 94% rename from lib/examples/main.rs rename to lib/examples/https.rs index d8d0705aa..036ade116 100644 --- a/lib/examples/main.rs +++ b/lib/examples/https.rs @@ -42,7 +42,12 @@ fn main() -> anyhow::Result<()> { let jg = thread::spawn(move || { let max_buffers = 500; let buffer_size = 16384; - sozu_lib::http::start_http_worker(http_listener, channel, max_buffers, buffer_size); + sozu_lib::http::testing::start_http_worker( + http_listener, + channel, + max_buffers, + buffer_size, + ); }); let http_front = RequestHttpFrontend { @@ -82,7 +87,12 @@ fn main() -> anyhow::Result<()> { let jg2 = thread::spawn(move || { let max_buffers = 500; let buffer_size = 16384; - sozu_lib::https::start_https_worker(https_listener, channel2, max_buffers, buffer_size) + sozu_lib::https::testing::start_https_worker( + https_listener, + channel2, + max_buffers, + buffer_size, + ) }); let cert1 = include_str!("../assets/certificate.pem"); diff --git a/lib/examples/tcp.rs b/lib/examples/tcp.rs index 248f32840..4ba4576ae 100644 --- a/lib/examples/tcp.rs +++ b/lib/examples/tcp.rs @@ -34,7 +34,7 @@ fn main() -> anyhow::Result<()> { ..Default::default() }; setup_logging("stdout", None, "debug", "TCP"); - sozu_lib::tcp::start_tcp_worker(listener, max_buffers, buffer_size, channel); + sozu_lib::tcp::testing::start_tcp_worker(listener, max_buffers, buffer_size, channel); }); let tcp_front = RequestTcpFrontend { diff --git a/lib/src/http.rs b/lib/src/http.rs index 71504367a..2c97976e1 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -3,19 +3,17 @@ use std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, io::ErrorKind, net::{Shutdown, SocketAddr}, - os::unix::io::{AsRawFd, IntoRawFd}, + os::unix::io::AsRawFd, rc::{Rc, Weak}, str::from_utf8_unchecked, }; -use anyhow::Context; use mio::{ - net::{TcpListener, TcpStream, UnixStream}, + net::{TcpListener, TcpStream}, unix::SourceFd, - Interest, Poll, Registry, Token, + Interest, Registry, Token, }; use rusty_ulid::Ulid; -use slab::Slab; use time::{Duration, Instant}; use sozu_command::{ @@ -27,7 +25,6 @@ use sozu_command::{ ready::Ready, request::WorkerRequest, response::{HttpFrontend, WorkerResponse}, - scm_socket::{Listeners, ScmSocket}, state::ClusterId, }; @@ -43,7 +40,7 @@ use crate::{ Http, Pipe, SessionState, }, router::{Route, Router}, - server::{ListenSession, ListenToken, ProxyChannel, Server, SessionManager}, + server::{ListenToken, SessionManager}, socket::server_bind, timer::TimeoutContainer, AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, @@ -994,113 +991,80 @@ impl L7Proxy for HttpProxy { } } -/// This is starts an HTTP worker with an HTTP listener config. -/// It activates the Listener automatically. -pub fn start_http_worker( - config: HttpListenerConfig, - channel: ProxyChannel, - max_buffers: usize, - buffer_size: usize, -) -> anyhow::Result<()> { - use crate::server; - - let event_loop = Poll::new().with_context(|| "could not create event loop")?; - - let pool = Rc::new(RefCell::new(Pool::with_capacity( - 1, - max_buffers, - buffer_size, - ))); - let backends = Rc::new(RefCell::new(BackendMap::new())); - let mut sessions: Slab>> = Slab::with_capacity(max_buffers); - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for channel", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for timer", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for metrics", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - - let token = { - let entry = sessions.vacant_entry(); - let key = entry.key(); - let _e = entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - Token(key) - }; +pub mod testing { + use crate::testing::*; - let address = config.address.clone(); - let sessions = SessionManager::new(sessions, max_buffers); - let registry = event_loop - .registry() - .try_clone() - .with_context(|| "Failed at creating a registry")?; - let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone()); - let _ = proxy.add_listener(config, token); - let _ = proxy.activate_listener( - &address + /// this function is not used, but is available for example and testing purposes + pub fn start_http_worker( + config: HttpListenerConfig, + channel: ProxyChannel, + max_buffers: usize, + buffer_size: usize, + ) -> anyhow::Result<()> { + let address = config + .address .parse() - .with_context(|| "Could not parse socket address")?, - None, - ); - let (scm_server, scm_client) = - UnixStream::pair().with_context(|| "Failed at creating scm stream sockets")?; - let client_scm_socket = - ScmSocket::new(scm_client.into_raw_fd()).with_context(|| "Could not create scm socket")?; - let server_scm_socket = - ScmSocket::new(scm_server.as_raw_fd()).with_context(|| "Could not create scm socket")?; - - if let Err(e) = client_scm_socket.send_listeners(&Listeners::default()) { - error!("error sending empty listeners: {:?}", e); - } + .with_context(|| "Could not parse socket address")?; - let server_config = server::ServerConfig { - max_connections: max_buffers, - ..Default::default() - }; + let ServerParts { + event_loop, + registry, + sessions, + pool, + backends, + client_scm_socket: _, + server_scm_socket, + server_config, + } = prebuild_server(max_buffers, buffer_size, true)?; + + let token = { + let mut sessions = sessions.borrow_mut(); + let entry = sessions.slab.vacant_entry(); + let key = entry.key(); + let _ = entry.insert(Rc::new(RefCell::new(ListenSession { + protocol: Protocol::HTTPListen, + }))); + Token(key) + }; - let mut server = Server::new( - event_loop, - channel, - server_scm_socket, - sessions, - pool, - backends, - Some(proxy), - None, - None, - server_config, - None, - false, - ) - .with_context(|| "Failed at creating server")?; - - debug!("starting event loop"); - server.run(); - debug!("ending event loop"); - Ok(()) + let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone()); + proxy + .add_listener(config, token) + .with_context(|| "Failed at creating adding the listener")?; + proxy + .activate_listener(&address, None) + .with_context(|| "Failed at creating activating the listener")?; + + let mut server = Server::new( + event_loop, + channel, + server_scm_socket, + sessions, + pool, + backends, + Some(proxy), + None, + None, + server_config, + None, + false, + ) + .with_context(|| "Failed at creating server")?; + + debug!("starting event loop"); + server.run(); + debug!("ending event loop"); + Ok(()) + } } #[cfg(test)] mod tests { extern crate tiny_http; + use super::testing::start_http_worker; use super::*; + use crate::sozu_command::{ channel::Channel, config::ListenerBuilder, @@ -1181,7 +1145,7 @@ mod tests { println!("test received: {:?}", command.read_message()); println!("test received: {:?}", command.read_message()); - let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not parse address"); + let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not connect"); // 5 seconds of timeout client.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); @@ -1264,7 +1228,7 @@ mod tests { println!("test received: {:?}", command.read_message()); println!("test received: {:?}", command.read_message()); - let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not parse address"); + let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not connect"); // 5 seconds of timeout client.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); @@ -1387,7 +1351,7 @@ mod tests { println!("test received: {:?}", command.read_message()); println!("test received: {:?}", command.read_message()); - let mut client = TcpStream::connect(("127.0.0.1", 1041)).expect("could not parse address"); + let mut client = TcpStream::connect(("127.0.0.1", 1041)).expect("could not connect"); // 5 seconds of timeout client.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); diff --git a/lib/src/https.rs b/lib/src/https.rs index 57ab9d312..c1927a2c3 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -3,17 +3,16 @@ use std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, io::ErrorKind, net::{Shutdown, SocketAddr as StdSocketAddr}, - os::unix::{io::AsRawFd, net::UnixStream}, + os::unix::io::AsRawFd, rc::{Rc, Weak}, str::{from_utf8, from_utf8_unchecked}, sync::Arc, }; -use anyhow::Context; use mio::{ net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream}, unix::SourceFd, - Interest, Poll, Registry, Token, + Interest, Registry, Token, }; use rustls::{ crypto::{ @@ -32,7 +31,6 @@ use rustls::{ CipherSuite, ProtocolVersion, ServerConfig, ServerConnection, SupportedCipherSuite, }; use rusty_ulid::Ulid; -use slab::Slab; use time::{Duration, Instant}; use sozu_command::{ @@ -48,7 +46,6 @@ use sozu_command::{ ready::Ready, request::WorkerRequest, response::{HttpFrontend, WorkerResponse}, - scm_socket::ScmSocket, state::ClusterId, }; @@ -66,7 +63,7 @@ use crate::{ Http, Pipe, SessionState, }, router::{Route, Router}, - server::{ListenSession, ListenToken, ProxyChannel, Server, SessionManager, SessionToken}, + server::{ListenToken, SessionManager}, socket::{server_bind, FrontRustls}, timer::TimeoutContainer, tls::{CertifiedKeyWrapper, MutexWrappedCertificateResolver, ResolveCertificate}, @@ -1498,83 +1495,54 @@ fn rustls_ciphersuite_str(cipher: SupportedCipherSuite) -> &'static str { } } -/// this function is not used, but is available for example and testing purposes -pub fn start_https_worker( - config: HttpsListenerConfig, - channel: ProxyChannel, - max_buffers: usize, - buffer_size: usize, -) -> anyhow::Result<()> { - use crate::server; - - let event_loop = Poll::new().with_context(|| "could not create event loop")?; - - let pool = Rc::new(RefCell::new(Pool::with_capacity( - 1, - max_buffers, - buffer_size, - ))); - let backends = Rc::new(RefCell::new(BackendMap::new())); - - let mut sessions: Slab>> = Slab::with_capacity(max_buffers); - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for channel", SessionToken(entry.key())); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for timer", SessionToken(entry.key())); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for metrics", SessionToken(entry.key())); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - - let token = { - let entry = sessions.vacant_entry(); - let key = entry.key(); - let _e = entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - Token(key) - }; - - let sessions = SessionManager::new(sessions, max_buffers); - let registry = event_loop - .registry() - .try_clone() - .with_context(|| "Failed at creating a registry")?; - let mut proxy = HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone()); - let address = config.address.clone(); - if proxy.add_listener(config, token).is_some() - && proxy - .activate_listener( - &address - .parse() - .with_context(|| "Could not parse socket address")?, - None, - ) - .is_ok() - { - let (scm_server, _scm_client) = - UnixStream::pair().with_context(|| "Failed at creating scm stream sockets")?; - let server_config = server::ServerConfig { - max_connections: max_buffers, - ..Default::default() +pub mod testing { + use crate::testing::*; + + /// this function is not used, but is available for example and testing purposes + pub fn start_https_worker( + config: HttpsListenerConfig, + channel: ProxyChannel, + max_buffers: usize, + buffer_size: usize, + ) -> anyhow::Result<()> { + let address = config + .address + .parse() + .with_context(|| "Could not parse socket address")?; + + let ServerParts { + event_loop, + registry, + sessions, + pool, + backends, + client_scm_socket: _, + server_scm_socket, + server_config, + } = prebuild_server(max_buffers, buffer_size, true)?; + + let token = { + let mut sessions = sessions.borrow_mut(); + let entry = sessions.slab.vacant_entry(); + let key = entry.key(); + let _ = entry.insert(Rc::new(RefCell::new(ListenSession { + protocol: Protocol::HTTPSListen, + }))); + Token(key) }; + + let mut proxy = HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone()); + proxy + .add_listener(config, token) + .with_context(|| "Failed at creating adding the listener")?; + proxy + .activate_listener(&address, None) + .with_context(|| "Failed at creating activating the listener")?; + let mut server = Server::new( event_loop, channel, - ScmSocket::new(scm_server.as_raw_fd()).unwrap(), + server_scm_socket, sessions, pool, backends, @@ -1585,25 +1553,25 @@ pub fn start_https_worker( None, false, ) - .with_context(|| "Failed to create server")?; + .with_context(|| "Failed at creating server")?; - info!("starting event loop"); + debug!("starting event loop"); server.run(); - info!("ending event loop"); + debug!("ending event loop"); + Ok(()) } - Ok(()) } #[cfg(test)] mod tests { + use super::*; + use std::{str::FromStr, sync::Arc}; use sozu_command::config::ListenerBuilder; use crate::router::{trie::TrieNode, MethodRule, PathRule, Route, Router}; - use super::*; - /* #[test] #[cfg(target_pointer_width = "64")] diff --git a/lib/src/lib.rs b/lib/src/lib.rs index c6b2ea4a1..203a7944b 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1116,3 +1116,110 @@ impl PeakEWMA { (active_requests + 1) as f64 * self.rtt } } + +pub mod testing { + pub use std::{cell::RefCell, os::fd::IntoRawFd, rc::Rc}; + + pub use anyhow::Context; + pub use mio::{net::UnixStream, Poll, Registry, Token}; + pub use slab::Slab; + pub use sozu_command::{ + proto::command::{HttpListenerConfig, HttpsListenerConfig, TcpListenerConfig}, + scm_socket::{Listeners, ScmSocket}, + }; + + pub use crate::{ + backends::BackendMap, + http::HttpProxy, + https::HttpsProxy, + pool::Pool, + server::Server, + server::{ListenSession, ProxyChannel, ServerConfig, SessionManager}, + tcp::TcpProxy, + Protocol, ProxySession, + }; + + /// Everything needed to create a Server + pub struct ServerParts { + pub event_loop: Poll, + pub registry: Registry, + pub sessions: Rc>, + pub pool: Rc>, + pub backends: Rc>, + pub client_scm_socket: ScmSocket, + pub server_scm_socket: ScmSocket, + pub server_config: ServerConfig, + } + + /// Setup a standalone server, for testing purposes + pub fn prebuild_server( + max_buffers: usize, + buffer_size: usize, + send_scm: bool, + ) -> anyhow::Result { + let event_loop = Poll::new().with_context(|| "Failed at creating event loop")?; + let backends = Rc::new(RefCell::new(BackendMap::new())); + let server_config = ServerConfig { + max_connections: max_buffers, + ..Default::default() + }; + + let pool = Rc::new(RefCell::new(Pool::with_capacity( + 1, + max_buffers, + buffer_size, + ))); + + let mut sessions: Slab>> = Slab::with_capacity(max_buffers); + { + let entry = sessions.vacant_entry(); + info!("taking token {:?} for channel", entry.key()); + entry.insert(Rc::new(RefCell::new(ListenSession { + protocol: Protocol::Channel, + }))); + } + { + let entry = sessions.vacant_entry(); + info!("taking token {:?} for timer", entry.key()); + entry.insert(Rc::new(RefCell::new(ListenSession { + protocol: Protocol::Timer, + }))); + } + { + let entry = sessions.vacant_entry(); + info!("taking token {:?} for metrics", entry.key()); + entry.insert(Rc::new(RefCell::new(ListenSession { + protocol: Protocol::Metrics, + }))); + } + let sessions = SessionManager::new(sessions, max_buffers); + + let registry = event_loop + .registry() + .try_clone() + .with_context(|| "Failed at creating a registry")?; + + let (scm_server, scm_client) = + UnixStream::pair().with_context(|| "Failed at creating scm unix stream")?; + let client_scm_socket = ScmSocket::new(scm_client.into_raw_fd()) + .with_context(|| "Failed at creating the scm client socket")?; + let server_scm_socket = ScmSocket::new(scm_server.into_raw_fd()) + .with_context(|| "Failed at creating the scm server socket")?; + if send_scm { + client_scm_socket + .send_listeners(&Listeners::default()) + .with_context(|| "Failed at sending empty listeners")?; + } + + Ok(ServerParts { + event_loop, + registry, + sessions, + pool, + backends, + client_scm_socket, + server_scm_socket, + server_config, + }) + } +} diff --git a/lib/src/tcp.rs b/lib/src/tcp.rs index bdf70a66e..ff22be9dd 100644 --- a/lib/src/tcp.rs +++ b/lib/src/tcp.rs @@ -7,15 +7,11 @@ use std::{ rc::Rc, }; -use anyhow::Context; use mio::{ - net::TcpListener as MioTcpListener, - net::{TcpStream as MioTcpStream, UnixStream}, - unix::SourceFd, - Interest, Poll, Registry, Token, + net::TcpListener as MioTcpListener, net::TcpStream as MioTcpStream, unix::SourceFd, Interest, + Registry, Token, }; use rusty_ulid::Ulid; -use slab::Slab; use time::{Duration, Instant}; use sozu_command::{config::MAX_LOOP_ITERATIONS, proto::command::request::RequestType, ObjectKind}; @@ -31,10 +27,7 @@ use crate::{ Pipe, }, retry::RetryPolicy, - server::{ - push_event, ListenSession, ListenToken, ProxyChannel, Server, SessionManager, CONN_RETRIES, - TIMER, - }, + server::{push_event, ListenToken, SessionManager, CONN_RETRIES, TIMER}, socket::{server_bind, stats::socket_rtt}, sozu_command::{ logging, @@ -44,7 +37,6 @@ use crate::{ ready::Ready, request::WorkerRequest, response::WorkerResponse, - scm_socket::ScmSocket, state::ClusterId, }, timer::TimeoutContainer, @@ -1473,108 +1465,95 @@ impl ProxyConfiguration for TcpProxy { } } -/// This is not directly used by Sōzu but is available for example and testing purposes -pub fn start_tcp_worker( - config: TcpListenerConfig, - max_buffers: usize, - buffer_size: usize, - channel: ProxyChannel, -) -> anyhow::Result<()> { - use crate::server; - - let poll = Poll::new().with_context(|| "could not create event loop")?; - let pool = Rc::new(RefCell::new(Pool::with_capacity( - 1, - max_buffers, - buffer_size, - ))); - let backends = Rc::new(RefCell::new(BackendMap::new())); - - let mut sessions: Slab>> = Slab::with_capacity(max_buffers); - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for channel", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::TCPListen, - }))); - } - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for timer", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::TCPListen, - }))); - } - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for metrics", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::TCPListen, - }))); - } +pub mod testing { + use crate::testing::*; - let token = { - let entry = sessions.vacant_entry(); - let key = entry.key(); - let _e = entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::TCPListen, - }))); - Token(key) - }; + /// This is not directly used by Sōzu but is available for example and testing purposes + pub fn start_tcp_worker( + config: TcpListenerConfig, + max_buffers: usize, + buffer_size: usize, + channel: ProxyChannel, + ) -> anyhow::Result<()> { + let address = config + .address + .parse() + .with_context(|| "Could not parse socket address")?; - let sessions = SessionManager::new(sessions, max_buffers); - let address = config.address.clone(); - let registry = poll - .registry() - .try_clone() - .with_context(|| "Failed at creating a registry")?; - let mut configuration = TcpProxy::new(registry, sessions.clone(), backends.clone()); - let _ = configuration.add_listener(config, pool.clone(), token); - let _ = configuration.activate_listener(&address.parse().unwrap(), None); - let (scm_server, _scm_client) = - UnixStream::pair().with_context(|| "Failed at creating scm stream sockets")?; - let scm_socket = - ScmSocket::new(scm_server.as_raw_fd()).with_context(|| "Could not create scm socket")?; - let server_config = server::ServerConfig { - max_connections: max_buffers, - ..Default::default() - }; + let ServerParts { + event_loop, + registry, + sessions, + pool, + backends, + client_scm_socket: _, + server_scm_socket, + server_config, + } = prebuild_server(max_buffers, buffer_size, true)?; + + let token = { + let mut sessions = sessions.borrow_mut(); + let entry = sessions.slab.vacant_entry(); + let key = entry.key(); + let _ = entry.insert(Rc::new(RefCell::new(ListenSession { + protocol: Protocol::TCPListen, + }))); + Token(key) + }; - let mut server = Server::new( - poll, - channel, - scm_socket, - sessions, - pool, - backends, - None, - None, - Some(configuration), - server_config, - None, - false, - ) - .with_context(|| "Could not create tcp server")?; - - info!("starting event loop"); - server.run(); - info!("ending event loop"); - Ok(()) + let mut proxy = TcpProxy::new(registry, sessions.clone(), backends.clone()); + proxy + .add_listener(config, pool.clone(), token) + .with_context(|| "Failed at creating adding the listener")?; + proxy + .activate_listener(&address, None) + .with_context(|| "Failed at creating activating the listener")?; + + let mut server = Server::new( + event_loop, + channel, + server_scm_socket, + sessions, + pool, + backends, + None, + None, + Some(proxy), + server_config, + None, + false, + ) + .with_context(|| "Failed at creating server")?; + + debug!("starting event loop"); + server.run(); + debug!("ending event loop"); + Ok(()) + } } #[cfg(test)] mod tests { - use super::*; - use crate::sozu_command::{ - channel::Channel, proto::command::LoadBalancingParams, scm_socket::Listeners, - }; + use super::testing::start_tcp_worker; + use crate::testing::*; + use std::{ io::{Read, Write}, net::{Shutdown, TcpListener, TcpStream}, - os::unix::io::IntoRawFd, - sync::atomic::{AtomicBool, Ordering}, - sync::{Arc, Barrier}, - {str, thread}, + str, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Barrier, + }, + thread, + }; + + use sozu_command::{ + channel::Channel, + config::ListenerBuilder, + proto::command::{request::RequestType, LoadBalancingParams, RequestTcpFrontend}, + request::WorkerRequest, + response::WorkerResponse, }; static TEST_FINISHED: AtomicBool = AtomicBool::new(false); @@ -1599,9 +1578,9 @@ mod tests { let _tx = start_proxy().expect("Could not start proxy"); barrier.wait(); - let mut s1 = TcpStream::connect("127.0.0.1:1234").expect("could not parse address"); - let s3 = TcpStream::connect("127.0.0.1:1234").expect("could not parse address"); - let mut s2 = TcpStream::connect("127.0.0.1:1234").expect("could not parse address"); + let mut s1 = TcpStream::connect("127.0.0.1:1234").expect("could not connect"); + let s3 = TcpStream::connect("127.0.0.1:1234").expect("could not connect"); + let mut s2 = TcpStream::connect("127.0.0.1:1234").expect("could not connect"); s1.write(&b"hello "[..]) .map_err(|e| { @@ -1656,7 +1635,7 @@ mod tests { } fn start_server(barrier: Arc) { - let listener = TcpListener::bind("127.0.0.1:5678").expect("could not parse address"); + let listener = TcpListener::bind("127.0.0.1:5678").expect("could not bind"); fn handle_client(stream: &mut TcpStream, id: u8) { let mut buf = [0; 128]; let _response = b" END"; @@ -1694,102 +1673,15 @@ mod tests { /// used in tests only pub fn start_proxy() -> anyhow::Result> { - use crate::server; + let config = ListenerBuilder::new_tcp("127.0.0.1:1234") + .to_tcp(None) + .expect("could not create listener config"); - info!("listen for connections"); let (mut command, channel) = Channel::generate(1000, 10000).with_context(|| "should create a channel")?; - - // this thread should call a start() function that performs the same logic and returns Result<()> - // any error coming from this start() would be mapped and logged within the thread - thread::spawn(move || { + let _jg = thread::spawn(move || { setup_test_logger!(); - info!("starting event loop"); - let poll = Poll::new().expect("could not create event loop"); - let max_connections = 100; - let buffer_size = 16384; - let pool = Rc::new(RefCell::new(Pool::with_capacity( - 1, - 2 * max_connections, - buffer_size, - ))); - let backends = Rc::new(RefCell::new(BackendMap::new())); - - let mut sessions: Slab>> = - Slab::with_capacity(max_connections); - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for channel", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for timer", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - { - let entry = sessions.vacant_entry(); - info!("taking token {:?} for metrics", entry.key()); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::HTTPListen, - }))); - } - - let sessions = SessionManager::new(sessions, max_connections); - let registry = poll.registry().try_clone().unwrap(); - let mut configuration = TcpProxy::new(registry, sessions.clone(), backends.clone()); - let listener_config = TcpListenerConfig { - address: "127.0.0.1:1234".to_string(), - ..Default::default() - }; - - { - let address = listener_config.address.clone(); - let mut s = sessions.borrow_mut(); - let entry = s.slab.vacant_entry(); - let _ = - configuration.add_listener(listener_config, pool.clone(), Token(entry.key())); - let _ = configuration.activate_listener(&address.parse().unwrap(), None); - entry.insert(Rc::new(RefCell::new(ListenSession { - protocol: Protocol::TCPListen, - }))); - } - - let (scm_server, scm_client) = UnixStream::pair().unwrap(); - let client_scm_socket = - ScmSocket::new(scm_client.into_raw_fd()).expect("Could not create scm socket"); - let server_scm_socket = - ScmSocket::new(scm_server.as_raw_fd()).expect("Could not create scm socket"); - client_scm_socket - .send_listeners(&Listeners::default()) - .unwrap(); - - let server_config = server::ServerConfig { - max_connections, - ..Default::default() - }; - let mut server = Server::new( - poll, - channel, - server_scm_socket, - sessions, - pool, - backends, - None, - None, - Some(configuration), - server_config, - None, - false, - ) - .expect("Failed at creating the server"); - info!("will run"); - server.run(); - info!("ending event loop"); + start_tcp_worker(config, 100, 16384, channel).expect("could not start the tcp server"); }); command.blocking().unwrap();