Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor HTTP, HTTPS and TCP example code #1068

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ include = [
"./README.md",
"Cargo.toml",
"src/**/*",
"/examples/main.rs",
"/examples/minimal.rs",
"/examples/*",
"assets/certificate.pem",
"assets/certificate_chain.pem",
"assets/key.pem",
Expand Down
2 changes: 1 addition & 1 deletion lib/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# sozu_lib, a proxy development library

`sozu_lib` provides tools to write a proxy that can be reconfigured
without any downtime. See `examples/minimal.rs` for a small example
without any downtime. See `examples/http.rs` for a small example
of starting a HTTP proxy with one cluster.

A proxy starts as an event loop with which you communicate through
Expand Down
9 changes: 7 additions & 2 deletions lib/examples/minimal.rs → lib/examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ fn main() -> anyhow::Result<()> {
let worker_thread_join_handle = thread::spawn(move || {
let max_buffers = 500;
let buffer_size = 16384;
sozu_lib::http::start_http_worker(http_listener, proxy_channel, max_buffers, buffer_size)
.expect("The worker could not be started, or shut down");
sozu_lib::http::testing::start_http_worker(
http_listener,
proxy_channel,
max_buffers,
buffer_size,
)
.expect("The worker could not be started, or shut down");
});

let cluster = Cluster {
Expand Down
14 changes: 12 additions & 2 deletions lib/examples/main.rs → lib/examples/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ fn main() -> anyhow::Result<()> {
let jg = thread::spawn(move || {
let max_buffers = 500;
let buffer_size = 16384;
sozu_lib::http::start_http_worker(http_listener, channel, max_buffers, buffer_size);
sozu_lib::http::testing::start_http_worker(
http_listener,
channel,
max_buffers,
buffer_size,
);
});

let http_front = RequestHttpFrontend {
Expand Down Expand Up @@ -82,7 +87,12 @@ fn main() -> anyhow::Result<()> {
let jg2 = thread::spawn(move || {
let max_buffers = 500;
let buffer_size = 16384;
sozu_lib::https::start_https_worker(https_listener, channel2, max_buffers, buffer_size)
sozu_lib::https::testing::start_https_worker(
https_listener,
channel2,
max_buffers,
buffer_size,
)
});

let cert1 = include_str!("../assets/certificate.pem");
Expand Down
2 changes: 1 addition & 1 deletion lib/examples/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() -> anyhow::Result<()> {
..Default::default()
};
setup_logging("stdout", None, "debug", "TCP");
sozu_lib::tcp::start_tcp_worker(listener, max_buffers, buffer_size, channel);
sozu_lib::tcp::testing::start_tcp_worker(listener, max_buffers, buffer_size, channel);
});

let tcp_front = RequestTcpFrontend {
Expand Down
176 changes: 70 additions & 106 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
io::ErrorKind,
net::{Shutdown, SocketAddr},
os::unix::io::{AsRawFd, IntoRawFd},
os::unix::io::AsRawFd,
rc::{Rc, Weak},
str::from_utf8_unchecked,
};

use anyhow::Context;
use mio::{
net::{TcpListener, TcpStream, UnixStream},
net::{TcpListener, TcpStream},
unix::SourceFd,
Interest, Poll, Registry, Token,
Interest, Registry, Token,
};
use rusty_ulid::Ulid;
use slab::Slab;
use time::{Duration, Instant};

use sozu_command::{
Expand All @@ -27,7 +25,6 @@ use sozu_command::{
ready::Ready,
request::WorkerRequest,
response::{HttpFrontend, WorkerResponse},
scm_socket::{Listeners, ScmSocket},
state::ClusterId,
};

Expand All @@ -43,7 +40,7 @@ use crate::{
Http, Pipe, SessionState,
},
router::{Route, Router},
server::{ListenSession, ListenToken, ProxyChannel, Server, SessionManager},
server::{ListenToken, SessionManager},
socket::server_bind,
timer::TimeoutContainer,
AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
Expand Down Expand Up @@ -994,113 +991,80 @@ impl L7Proxy for HttpProxy {
}
}

/// This is starts an HTTP worker with an HTTP listener config.
/// It activates the Listener automatically.
pub fn start_http_worker(
config: HttpListenerConfig,
channel: ProxyChannel,
max_buffers: usize,
buffer_size: usize,
) -> anyhow::Result<()> {
use crate::server;

let event_loop = Poll::new().with_context(|| "could not create event loop")?;

let pool = Rc::new(RefCell::new(Pool::with_capacity(
1,
max_buffers,
buffer_size,
)));
let backends = Rc::new(RefCell::new(BackendMap::new()));
let mut sessions: Slab<Rc<RefCell<dyn ProxySession>>> = Slab::with_capacity(max_buffers);
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for channel", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
}
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for timer", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
}
{
let entry = sessions.vacant_entry();
info!("taking token {:?} for metrics", entry.key());
entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
}

let token = {
let entry = sessions.vacant_entry();
let key = entry.key();
let _e = entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
Token(key)
};
pub mod testing {
use crate::testing::*;

let address = config.address.clone();
let sessions = SessionManager::new(sessions, max_buffers);
let registry = event_loop
.registry()
.try_clone()
.with_context(|| "Failed at creating a registry")?;
let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
let _ = proxy.add_listener(config, token);
let _ = proxy.activate_listener(
&address
/// this function is not used, but is available for example and testing purposes
pub fn start_http_worker(
config: HttpListenerConfig,
channel: ProxyChannel,
max_buffers: usize,
buffer_size: usize,
) -> anyhow::Result<()> {
let address = config
.address
.parse()
.with_context(|| "Could not parse socket address")?,
None,
);
let (scm_server, scm_client) =
UnixStream::pair().with_context(|| "Failed at creating scm stream sockets")?;
let client_scm_socket =
ScmSocket::new(scm_client.into_raw_fd()).with_context(|| "Could not create scm socket")?;
let server_scm_socket =
ScmSocket::new(scm_server.as_raw_fd()).with_context(|| "Could not create scm socket")?;

if let Err(e) = client_scm_socket.send_listeners(&Listeners::default()) {
error!("error sending empty listeners: {:?}", e);
}
.with_context(|| "Could not parse socket address")?;

let server_config = server::ServerConfig {
max_connections: max_buffers,
..Default::default()
};
let ServerParts {
event_loop,
registry,
sessions,
pool,
backends,
client_scm_socket: _,
server_scm_socket,
server_config,
} = prebuild_server(max_buffers, buffer_size, true)?;

let token = {
let mut sessions = sessions.borrow_mut();
let entry = sessions.slab.vacant_entry();
let key = entry.key();
let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
protocol: Protocol::HTTPListen,
})));
Token(key)
};

let mut server = Server::new(
event_loop,
channel,
server_scm_socket,
sessions,
pool,
backends,
Some(proxy),
None,
None,
server_config,
None,
false,
)
.with_context(|| "Failed at creating server")?;

debug!("starting event loop");
server.run();
debug!("ending event loop");
Ok(())
let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
proxy
.add_listener(config, token)
.with_context(|| "Failed at creating adding the listener")?;
proxy
.activate_listener(&address, None)
.with_context(|| "Failed at creating activating the listener")?;

let mut server = Server::new(
event_loop,
channel,
server_scm_socket,
sessions,
pool,
backends,
Some(proxy),
None,
None,
server_config,
None,
false,
)
.with_context(|| "Failed at creating server")?;

debug!("starting event loop");
server.run();
debug!("ending event loop");
Ok(())
}
}

#[cfg(test)]
mod tests {
extern crate tiny_http;

use super::testing::start_http_worker;
use super::*;

use crate::sozu_command::{
channel::Channel,
config::ListenerBuilder,
Expand Down Expand Up @@ -1181,7 +1145,7 @@ mod tests {
println!("test received: {:?}", command.read_message());
println!("test received: {:?}", command.read_message());

let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not parse address");
let mut client = TcpStream::connect(("127.0.0.1", 1024)).expect("could not connect");

// 5 seconds of timeout
client.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
Expand Down Expand Up @@ -1264,7 +1228,7 @@ mod tests {
println!("test received: {:?}", command.read_message());
println!("test received: {:?}", command.read_message());

let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not parse address");
let mut client = TcpStream::connect(("127.0.0.1", 1031)).expect("could not connect");
// 5 seconds of timeout
client.set_read_timeout(Some(Duration::new(5, 0))).unwrap();

Expand Down Expand Up @@ -1387,7 +1351,7 @@ mod tests {
println!("test received: {:?}", command.read_message());
println!("test received: {:?}", command.read_message());

let mut client = TcpStream::connect(("127.0.0.1", 1041)).expect("could not parse address");
let mut client = TcpStream::connect(("127.0.0.1", 1041)).expect("could not connect");
// 5 seconds of timeout
client.set_read_timeout(Some(Duration::new(5, 0))).unwrap();

Expand Down
Loading
Loading