Skip to content

Commit

Permalink
Merge pull request #1068 from sozu-proxy/devel/edemolis/fix/examples
Browse files Browse the repository at this point in the history
Refactor HTTP, HTTPS and TCP example code
  • Loading branch information
Keksoj authored Feb 2, 2024
2 parents 66114a4 + 95624e6 commit bd356d0
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 401 deletions.
3 changes: 1 addition & 2 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ include = [
"./README.md",
"Cargo.toml",
"src/**/*",
"/examples/main.rs",
"/examples/minimal.rs",
"/examples/*",
"assets/certificate.pem",
"assets/certificate_chain.pem",
"assets/key.pem",
Expand Down
2 changes: 1 addition & 1 deletion lib/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# sozu_lib, a proxy development library

`sozu_lib` provides tools to write a proxy that can be reconfigured
without any downtime. See `examples/minimal.rs` for a small example
without any downtime. See `examples/http.rs` for a small example
of starting a HTTP proxy with one cluster.

A proxy starts as an event loop with which you communicate through
Expand Down
9 changes: 7 additions & 2 deletions lib/examples/minimal.rs → lib/examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ fn main() -> anyhow::Result<()> {
let worker_thread_join_handle = thread::spawn(move || {
let max_buffers = 500;
let buffer_size = 16384;
sozu_lib::http::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size)
.expect("The worker could not be started, or shut down");
sozu_lib::http::testing::start_http_worker(
http_listener,
proxy_channel,
max_buffers,
buffer_size,
)
.expect("The worker could not be started, or shut down");
});

let cluster = Cluster {
Expand Down
14 changes: 12 additions & 2 deletions lib/examples/main.rs → lib/examples/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ fn main() -> anyhow::Result<()> {
let jg = thread::spawn(move || {
let max_buffers = 500;
let buffer_size = 16384;
sozu_lib::http::start_http_worker(http_listener, channel, max_buffers, buffer_size);
sozu_lib::http::testing::start_http_worker(
http_listener,
channel,
max_buffers,
buffer_size,
);
});

let http_front = RequestHttpFrontend {
Expand Down Expand Up @@ -82,7 +87,12 @@ fn main() -> anyhow::Result<()> {
let jg2 = thread::spawn(move || {
let max_buffers = 500;
let buffer_size = 16384;
sozu_lib::https::start_https_worker(https_listener, channel2, max_buffers, buffer_size)
sozu_lib::https::testing::start_https_worker(
https_listener,
channel2,
max_buffers,
buffer_size,
)
});

let cert1 = include_str!("../assets/certificate.pem");
Expand Down
2 changes: 1 addition & 1 deletion lib/examples/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() -> anyhow::Result<()> {
..Default::default()
};
setup_logging("stdout", None, "debug", "TCP");
sozu_lib::tcp::start_tcp_worker(listener, max_buffers, buffer_size, channel);
sozu_lib::tcp::testing::start_tcp_worker(listener, max_buffers, buffer_size, channel);
});

let tcp_front = RequestTcpFrontend {
Expand Down
176 changes: 70 additions & 106 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
io::ErrorKind,
net::{Shutdown, SocketAddr},
os::unix::io::{AsRawFd, IntoRawFd},
os::unix::io::AsRawFd,
rc::{Rc, Weak},
str::from_utf8_unchecked,
};

use anyhow::Context;
use mio::{
net::{TcpListener, TcpStream, UnixStream},
net::{TcpListener, TcpStream},
unix::SourceFd,
Interest, Poll, Registry, Token,
Interest, Registry, Token,
};
use rusty_ulid::Ulid;
use slab::Slab;
use time::{Duration, Instant};

use sozu_command::{
Expand All @@ -27,7 +25,6 @@ use sozu_command::{
ready::Ready,
request::WorkerRequest,
response::{HttpFrontend, WorkerResponse},
scm_socket::{Listeners, ScmSocket},
state::ClusterId,
};

Expand All @@ -43,7 +40,7 @@ use crate::{
Http, Pipe, SessionState,
},
router::{Route, Router},
server::{ListenSession, ListenToken, ProxyChannel, Server, SessionManager},
server::{ListenToken, SessionManager},
socket::server_bind,
timer::TimeoutContainer,
AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
Expand Down Expand Up @@ -994,113 +991,80 @@ impl L7Proxy for HttpProxy {
}
}

/// This is starts an HTTP worker with an HTTP listener config.
/// It activates the Listener automatically.
pub fn start_http_worker(
config: HttpListenerConfig,
channel: ProxyChannel,
max_buffers: usize,
buffer_size: usize,
) -> anyhow::Result<()> {
use crate::server;

let event_loop = Poll::new().with_context(|| "could not create event loop")?;

let pool = Rc::new(RefCell::new(Pool::with_capacity(
1,
max_buffers,
buffer_size,
)));
let backends = Rc::new(RefCell::new(BackendMap::new()));
let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for channel", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
}
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for timer", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
}
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for metrics", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
}

let token = {
let entry = sessions.vacant_entry();
let key = entry.key();
let _e = entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
Token(key)
};
pub mod testing {
use crate::testing::*;

let address = config.address.clone();
let sessions = SessionManager::new(sessions, max_buffers);
let registry = event_loop
.registry()
.try_clone()
.with_context(|| "Failed at creating a registry")?;
let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
let _ = proxy.add_listener(config, token);
let _ = proxy.activate_listener(
&address
/// this function is not used, but is available for example and testing purposes
pub fn start_http_worker(
config: HttpListenerConfig,
channel: ProxyChannel,
max_buffers: usize,
buffer_size: usize,
) -> anyhow::Result<()> {
let address = config
.address
.parse()
.with_context(|| "Could not parse socket address")?,
None,
);
let (scm_server, scm_client) =
UnixStream::pair().with_context(|| "Failed at creating scm stream sockets")?;
let client_scm_socket =
ScmSocket::new(scm_client.into_raw_fd()).with_context(|| "Could not create scm socket")?;
let server_scm_socket =
ScmSocket::new(scm_server.as_raw_fd()).with_context(|| "Could not create scm socket")?;

if let Err(e) = client_scm_socket.send_listeners(&Listeners::default()) {
error!("error sending empty listeners: {:?}", e);
}
.with_context(|| "Could not parse socket address")?;

let server_config = server::ServerConfig {
max_connections: max_buffers,
..Default::default()
};
let ServerParts {
event_loop,
registry,
sessions,
pool,
backends,
client_scm_socket: _,
server_scm_socket,
server_config,
} = prebuild_server(max_buffers, buffer_size, true)?;

let token = {
let mut sessions = sessions.borrow_mut();
let entry = sessions.slab.vacant_entry();
let key = entry.key();
let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
Token(key)
};

let mut server = Server::new(
event_loop,
channel,
server_scm_socket,
sessions,
pool,
backends,
Some(proxy),
None,
None,
server_config,
None,
false,
)
.with_context(|| "Failed at creating server")?;

debug!("starting event loop");
server.run();
debug!("ending event loop");
Ok(())
let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
proxy
.add_listener(config, token)
.with_context(|| "Failed at creating adding the listener")?;
proxy
.activate_listener(&address, None)
.with_context(|| "Failed at creating activating the listener")?;

let mut server = Server::new(
event_loop,
channel,
server_scm_socket,
sessions,
pool,
backends,
Some(proxy),
None,
None,
server_config,
None,
false,
)
.with_context(|| "Failed at creating server")?;

debug!("starting event loop");
server.run();
debug!("ending event loop");
Ok(())
}
}

#[cfg(test)]
mod tests {
extern crate tiny_http;

use super::testing::start_http_worker;
use super::*;

use crate::sozu_command::{
channel::Channel,
config::ListenerBuilder,
Expand Down Expand Up @@ -1181,7 +1145,7 @@ mod tests {
println!("test received: {:?}", command.read_message());
println!("test received: {:?}", command.read_message());

let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not parse address");
let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not connect");

// 5 seconds of timeout
client.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
Expand Down Expand Up @@ -1264,7 +1228,7 @@ mod tests {
println!("test received: {:?}", command.read_message());
println!("test received: {:?}", command.read_message());

let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not parse address");
let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not connect");
// 5 seconds of timeout
client.set_read_timeout(Some(Duration::new(5, 0))).unwrap();

Expand Down Expand Up @@ -1387,7 +1351,7 @@ mod tests {
println!("test received: {:?}", command.read_message());
println!("test received: {:?}", command.read_message());

let mut client = TcpStream::connect(("127.0.0.1", 1041)).expect("could not parse address");
let mut client = TcpStream::connect(("127.0.0.1", 1041)).expect("could not connect");
// 5 seconds of timeout
client.set_read_timeout(Some(Duration::new(5, 0))).unwrap();

Expand Down
Loading

0 comments on commit bd356d0

Please sign in to comment.