diff --git a/lib/src/http.rs b/lib/src/http.rs index 1ccea3281..359ba01d9 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -1,6 +1,6 @@ use std::{ cell::RefCell, - collections::{hash_map::Entry, BTreeMap, HashMap}, + collections::{hash_map::Entry, HashMap}, io::ErrorKind, net::{Shutdown, SocketAddr}, os::unix::io::AsRawFd, @@ -16,7 +16,6 @@ use mio::{ use rusty_ulid::Ulid; use sozu_command::{ - logging::CachedTags, proto::command::{ request::RequestType, Cluster, HttpListenerConfig, ListenerType, RemoveListener, RequestHttpFrontend, WorkerRequest, WorkerResponse, @@ -38,9 +37,9 @@ use crate::{ server::{ListenToken, SessionManager}, socket::server_bind, timer::TimeoutContainer, - AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, - ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, - SessionMetrics, SessionResult, StateMachineBuilder, StateResult, + AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, Protocol, + ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, SessionMetrics, + SessionResult, StateMachineBuilder, StateResult, }; #[derive(PartialEq, Eq)] @@ -58,7 +57,7 @@ StateMachineBuilder! { enum HttpStateMachine impl SessionState { Expect(ExpectProxyProtocol), Http(Http), - WebSocket(Pipe), + WebSocket(Pipe), } } @@ -249,11 +248,11 @@ impl HttpSession { http.request_stream.storage.buffer, frontend_token, http.frontend_socket, - self.listener.clone(), Protocol::HTTP, http.context.id, http.context.session_address, websocket_context, + http.context.tags, ); pipe.frontend_readiness.event = http.frontend_readiness.event; @@ -267,7 +266,7 @@ impl HttpSession { Some(HttpStateMachine::WebSocket(pipe)) } - fn upgrade_websocket(&self, ws: Pipe) -> Option { + fn upgrade_websocket(&self, ws: Pipe) -> Option { // what do we do here? error!("Upgrade called on WS, this should not happen"); Some(HttpStateMachine::WebSocket(ws)) @@ -398,27 +397,9 @@ pub struct HttpListener { config: HttpListenerConfig, fronts: Router, listener: Option, - tags: BTreeMap, token: Token, } -impl ListenerHandler for HttpListener { - fn get_addr(&self) -> &SocketAddr { - &self.address - } - - fn get_tags(&self, key: &str) -> Option<&CachedTags> { - self.tags.get(key) - } - - fn set_tags(&mut self, key: String, tags: Option>) { - match tags { - Some(tags) => self.tags.insert(key, CachedTags::new(tags)), - None => self.tags.remove(&key), - }; - } -} - impl L7ListenerHandler for HttpListener { fn get_sticky_name(&self) -> &str { &self.config.sticky_name @@ -599,13 +580,9 @@ impl HttpProxy { .ok_or(ProxyError::NoListenerFound(front.address))? .borrow_mut(); - let hostname = front.hostname.to_owned(); - let tags = front.tags.to_owned(); - listener .add_http_front(front) .map_err(ProxyError::AddFrontend)?; - listener.set_tags(hostname, tags); Ok(()) } @@ -624,13 +601,10 @@ impl HttpProxy { .ok_or(ProxyError::NoListenerFound(front.address))? .borrow_mut(); - let hostname = front.hostname.to_owned(); - listener .remove_http_front(front) .map_err(ProxyError::RemoveFrontend)?; - listener.set_tags(hostname, None); Ok(()) } @@ -693,7 +667,6 @@ impl HttpListener { config, fronts: Router::new(), listener: None, - tags: BTreeMap::new(), token, }) } @@ -1012,6 +985,16 @@ pub mod testing { mod tests { extern crate tiny_http; + use std::{ + collections::BTreeMap, + io::{Read, Write}, + net::TcpStream, + str, + sync::{Arc, Barrier}, + thread, + time::Duration, + }; + use super::testing::start_http_worker; use super::*; use sozu_command::proto::command::{RedirectPolicy, RedirectScheme, SocketAddress}; @@ -1023,15 +1006,6 @@ mod tests { response::{Backend, HttpFrontend}, }; - use std::{ - io::{Read, Write}, - net::TcpStream, - str, - sync::{Arc, Barrier}, - thread, - time::Duration, - }; - /* #[test] #[cfg(target_pointer_width = "64")] @@ -1370,7 +1344,6 @@ mod tests { config: default_config, token: Token(0), active: true, - tags: BTreeMap::new(), }; let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get); @@ -1379,20 +1352,20 @@ mod tests { let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get); let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get); assert_eq!( - frontend1.expect("should find frontend"), - RouteResult::forward("cluster_1".to_string()) + frontend1.expect("should find frontend").cluster_id, + Some("cluster_1".to_string()) ); assert_eq!( - frontend2.expect("should find frontend"), - RouteResult::forward("cluster_1".to_string()) + frontend2.expect("should find frontend").cluster_id, + Some("cluster_1".to_string()) ); assert_eq!( - frontend3.expect("should find frontend"), - RouteResult::forward("cluster_2".to_string()) + frontend3.expect("should find frontend").cluster_id, + Some("cluster_2".to_string()) ); assert_eq!( - frontend4.expect("should find frontend"), - RouteResult::forward("cluster_3".to_string()) + frontend4.expect("should find frontend").cluster_id, + Some("cluster_3".to_string()) ); assert!(frontend5.is_err()); } diff --git a/lib/src/https.rs b/lib/src/https.rs index 58e632ef3..c699df0df 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -1,6 +1,6 @@ use std::{ cell::RefCell, - collections::{hash_map::Entry, BTreeMap, HashMap}, + collections::{hash_map::Entry, HashMap}, io::ErrorKind, net::{Shutdown, SocketAddr as StdSocketAddr}, os::unix::io::AsRawFd, @@ -64,9 +64,9 @@ use crate::{ timer::TimeoutContainer, tls::MutexCertificateResolver, util::UnwrapLog, - AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, - ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, - SessionMetrics, SessionResult, StateMachineBuilder, StateResult, + AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, Protocol, + ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, SessionMetrics, + SessionResult, StateMachineBuilder, StateResult, }; // const SERVER_PROTOS: &[&str] = &["http/1.1", "h2"]; @@ -83,7 +83,7 @@ StateMachineBuilder! { Expect(ExpectProxyProtocol, ServerConnection), Handshake(TlsHandshake), Http(Http), - WebSocket(Pipe), + WebSocket(Pipe), Http2(Http2) -> todo!("H2"), } } @@ -369,11 +369,11 @@ impl HttpsSession { http.request_stream.storage.buffer, front_token, http.frontend_socket, - self.listener.clone(), Protocol::HTTPS, http.context.id, http.context.session_address, websocket_context, + http.context.tags, ); pipe.frontend_readiness.event = http.frontend_readiness.event; @@ -391,10 +391,7 @@ impl HttpsSession { todo!() } - fn upgrade_websocket( - &self, - wss: Pipe, - ) -> Option { + fn upgrade_websocket(&self, wss: Pipe) -> Option { // what do we do here? error!("Upgrade called on WSS, this should not happen"); Some(HttpsStateMachine::WebSocket(wss)) @@ -533,27 +530,9 @@ pub struct HttpsListener { listener: Option, resolver: Arc, rustls_details: Arc, - tags: BTreeMap, token: Token, } -impl ListenerHandler for HttpsListener { - fn get_addr(&self) -> &StdSocketAddr { - &self.address - } - - fn get_tags(&self, key: &str) -> Option<&CachedTags> { - self.tags.get(key) - } - - fn set_tags(&mut self, key: String, tags: Option>) { - match tags { - Some(tags) => self.tags.insert(key, CachedTags::new(tags)), - None => self.tags.remove(&key), - }; - } -} - impl L7ListenerHandler for HttpsListener { fn get_sticky_name(&self) -> &str { &self.config.sticky_name @@ -597,7 +576,6 @@ impl HttpsListener { )?)), config, token, - tags: BTreeMap::new(), }) } @@ -1015,7 +993,6 @@ impl HttpsProxy { .ok_or(ProxyError::NoListenerFound(front.address))? .borrow_mut(); - listener.set_tags(front.hostname.to_owned(), front.tags.to_owned()); listener .add_https_front(front) .map_err(ProxyError::AddFrontend)?; @@ -1040,7 +1017,6 @@ impl HttpsProxy { .ok_or(ProxyError::NoListenerFound(front.address))? .borrow_mut(); - listener.set_tags(front.hostname.to_owned(), None); listener .remove_https_front(front) .map_err(ProxyError::RemoveFrontend)?; @@ -1470,11 +1446,11 @@ pub mod testing { mod tests { use super::*; - use std::sync::Arc; + use std::{collections::BTreeMap, sync::Arc}; use sozu_command::{config::ListenerBuilder, proto::command::SocketAddress}; - use crate::router::{pattern_trie::TrieNode, MethodRule, PathRule, Route, Router}; + use crate::router::{pattern_trie::TrieNode, Frontend, MethodRule, PathRule, Router}; /* #[test] @@ -1507,25 +1483,25 @@ mod tests { "lolcatho.st".as_bytes(), &PathRule::Prefix(uri1), &MethodRule::new(None), - &Route::forward(cluster_id1.clone()) + &Frontend::forward(cluster_id1.clone()) )); assert!(fronts.add_tree_rule( "lolcatho.st".as_bytes(), &PathRule::Prefix(uri2), &MethodRule::new(None), - &Route::forward(cluster_id2) + &Frontend::forward(cluster_id2) )); assert!(fronts.add_tree_rule( "lolcatho.st".as_bytes(), &PathRule::Prefix(uri3), &MethodRule::new(None), - &Route::forward(cluster_id3) + &Frontend::forward(cluster_id3) )); assert!(fronts.add_tree_rule( "other.domain".as_bytes(), &PathRule::Prefix("test".to_string()), &MethodRule::new(None), - &Route::forward(cluster_id1) + &Frontend::forward(cluster_id1) )); let address = SocketAddress::new_v4(127, 0, 0, 1, 1032); @@ -1557,32 +1533,31 @@ mod tests { config: default_config, token: Token(0), active: true, - tags: BTreeMap::new(), }; println!("TEST {}", line!()); let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get); assert_eq!( - frontend1.expect("should find a frontend"), - RouteResult::forward("cluster_1".to_string()) + frontend1.expect("should find a frontend").cluster_id, + Some("cluster_1".to_string()) ); println!("TEST {}", line!()); let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get); assert_eq!( - frontend2.expect("should find a frontend"), - RouteResult::forward("cluster_1".to_string()) + frontend2.expect("should find a frontend").cluster_id, + Some("cluster_1".to_string()) ); println!("TEST {}", line!()); let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get); assert_eq!( - frontend3.expect("should find a frontend"), - RouteResult::forward("cluster_2".to_string()) + frontend3.expect("should find a frontend").cluster_id, + Some("cluster_2".to_string()) ); println!("TEST {}", line!()); let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get); assert_eq!( - frontend4.expect("should find a frontend"), - RouteResult::forward("cluster_3".to_string()) + frontend4.expect("should find a frontend").cluster_id, + Some("cluster_3".to_string()) ); println!("TEST {}", line!()); let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index eb858aafe..bd90b325d 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -519,16 +519,10 @@ macro_rules! StateMachineBuilder { } } -pub trait ListenerHandler { - fn get_addr(&self) -> &SocketAddr; +pub trait L4ListenerHandler { + fn get_tags(&self) -> Option<&CachedTags>; - fn get_tags(&self, key: &str) -> Option<&CachedTags>; - - fn get_concatenated_tags(&self, key: &str) -> Option<&str> { - self.get_tags(key).map(|tags| tags.concatenated.as_str()) - } - - fn set_tags(&mut self, key: String, tags: Option>); + fn set_tags(&mut self, tags: Option>); } #[derive(thiserror::Error, Debug)] diff --git a/lib/src/protocol/kawa_h1/editor.rs b/lib/src/protocol/kawa_h1/editor.rs index 479789f03..ba669ffea 100644 --- a/lib/src/protocol/kawa_h1/editor.rs +++ b/lib/src/protocol/kawa_h1/editor.rs @@ -6,6 +6,7 @@ use std::{ use rusty_ulid::Ulid; use sha2::{Digest, Sha256}; +use sozu_command::logging::CachedTags; use crate::{ pool::Checkout, @@ -16,6 +17,13 @@ use crate::{ use sozu_command_lib::logging::LogContext; +#[derive(Debug)] +pub struct HttpRoute { + pub method: Option, + pub authority: Option, + pub path: Option, +} + /// This is the container used to store and use information about the session from within a Kawa parser callback #[derive(Debug)] pub struct HttpContext { @@ -30,16 +38,9 @@ pub struct HttpContext { pub authorization_found: Option, /// position of the last header of the request (the "Sozu-Id"), only valid until prepare is called pub last_header: Option, - // ---------- Status Line - /// the value of the method in the request line - pub method: Option, - /// the value of the authority of the request (in the request line of "Host" header) - pub authority: Option, - /// the value of the path in the request line - pub path: Option, - /// the value of the status code in the response line + // ---------- Route + pub route: HttpRoute, pub status: Option, - /// the value of the reason in the response line pub reason: Option, // ---------- Additional optional data pub user_agent: Option, @@ -64,6 +65,8 @@ pub struct HttpContext { pub sticky_session: Option, /// Headers to add to response pub headers_response: Rc<[HeaderEdit]>, + /// tags of the contacted frontend + pub tags: Option>, } impl kawa::h1::ParserCallbacks for HttpContext { @@ -105,12 +108,12 @@ impl HttpContext { .. } = &request.detached.status_line { - self.method = method.data_opt(buf).map(Method::new); - self.authority = authority + self.route.method = method.data_opt(buf).map(Method::new); + self.route.authority = authority .data_opt(buf) .and_then(|data| from_utf8(data).ok()) .map(ToOwned::to_owned); - self.path = path + self.route.path = path .data_opt(buf) .and_then(|data| from_utf8(data).ok()) .map(ToOwned::to_owned); @@ -172,7 +175,7 @@ impl HttpContext { incr!("http.trusting.x_proto.diff"); debug!( "Trusting X-Forwarded-Proto for {:?} even though {:?} != {}", - self.authority, val, proto + self.route.authority, val, proto ); } } else if compare_no_case(key, b"X-Forwarded-Port") { @@ -185,7 +188,7 @@ impl HttpContext { incr!("http.trusting.x_port.diff"); debug!( "Trusting X-Forwarded-Port for {:?} even though {:?} != {}", - self.authority, val, expected + self.route.authority, val, expected ); } } else if compare_no_case(key, b"X-Forwarded-For") { @@ -326,7 +329,7 @@ impl HttpContext { .map(ToOwned::to_owned); } - if self.method == Some(Method::Head) { + if self.route.method == Some(Method::Head) { response.parsing_phase = kawa::ParsingPhase::Terminated; } @@ -379,9 +382,9 @@ impl HttpContext { self.keep_alive_backend = true; self.keep_alive_frontend = true; self.sticky_session_found = None; - self.method = None; - self.authority = None; - self.path = None; + self.route.method = None; + self.route.authority = None; + self.route.path = None; self.status = None; self.reason = None; self.user_agent = None; @@ -397,9 +400,11 @@ impl HttpContext { backend_id: self.backend_id.as_deref(), } } +} +impl HttpRoute { // -> host, path, method - pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> { + pub fn extract(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> { let given_method = self.method.as_ref().ok_or(RetrieveClusterError::NoMethod)?; let given_authority = self .authority diff --git a/lib/src/protocol/kawa_h1/mod.rs b/lib/src/protocol/kawa_h1/mod.rs index 3383836c4..8547634f7 100644 --- a/lib/src/protocol/kawa_h1/mod.rs +++ b/lib/src/protocol/kawa_h1/mod.rs @@ -13,7 +13,7 @@ use std::{ time::{Duration, Instant}, }; -use editor::apply_header_edits; +use editor::{apply_header_edits, HttpRoute}; use mio::{net::TcpStream, Interest, Token}; use parser::hostname_and_port; use rusty_ulid::Ulid; @@ -43,8 +43,8 @@ use crate::{ sozu_command::{logging::LogContext, ready::Ready}, timer::TimeoutContainer, AcceptError, BackendConnectAction, BackendConnectionError, FrontendFromRequestError, - L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession, Readiness, - RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult, + L7ListenerHandler, L7Proxy, Protocol, ProxySession, Readiness, RetrieveClusterError, + SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult, }; /// This macro is defined uniquely in this module to help the tracking of kawa h1 @@ -156,7 +156,7 @@ impl Origin { } /// Http will be contained in State which itself is contained by Session -pub struct Http { +pub struct Http { answers: Rc>, /// The last origin server we tried to communicate with. /// It may be connected or connecting. It may be the server serving the current response, @@ -185,7 +185,7 @@ pub struct Http { pub context: HttpContext, } -impl Http { +impl Http { /// Instantiate a new HTTP SessionState with: /// /// - frontend_interest: READABLE | HUP | ERROR @@ -265,10 +265,13 @@ impl Http Http Http { +impl Http { fn log_endpoint(&self) -> EndpointRecord { EndpointRecord::Http { - method: self.context.method.as_deref(), - authority: self.context.authority.as_deref(), - path: self.context.path.as_deref(), + method: self.context.route.method.as_deref(), + authority: self.context.route.authority.as_deref(), + path: self.context.route.path.as_deref(), reason: self.context.reason.as_deref(), status: self.context.status, } @@ -933,24 +936,15 @@ impl Http WebSocketContext { WebSocketContext::Http { - method: self.context.method.clone(), - authority: self.context.authority.clone(), - path: self.context.path.clone(), + method: self.context.route.method.clone(), + authority: self.context.route.authority.clone(), + path: self.context.route.path.clone(), reason: self.context.reason.clone(), status: self.context.status, } } pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) { - let listener = self.listener.borrow(); - let tags = self.context.authority.as_ref().and_then(|host| { - let hostname = match host.split_once(':') { - None => host, - Some((hostname, _)) => hostname, - }; - listener.get_tags(hostname) - }); - let context = self.context.log_context(); metrics.register_end_of_session(&context); @@ -963,7 +957,7 @@ impl Http Http String { - if let Some(method) = &self.context.method { - if let Some(authority) = &self.context.authority { - if let Some(path) = &self.context.path { + if let Some(method) = &self.context.route.method { + if let Some(authority) = &self.context.route.authority { + if let Some(path) = &self.context.route.path { return format!("{method} {authority}{path}"); } return format!("{method} {authority}"); @@ -1214,7 +1208,7 @@ impl Http>, ) -> Result { - let (host, path, method) = match self.context.extract_route() { + let (host, path, method) = match self.context.route.extract() { Ok(tuple) => tuple, Err(cluster_error) => { self.set_answer(DefaultAnswer::Answer400 { @@ -1276,6 +1270,7 @@ impl Http route, Err(frontend_error) => { @@ -1283,8 +1278,12 @@ impl Http Http { + (_, RedirectPolicy::Permanent, _, true) => { let location = format!("{proto}://{host}{port}{path}"); - self.context.cluster_id = cluster_id; self.set_answer(DefaultAnswer::Answer301 { location }); Err(RetrieveClusterError::Redirected) } - (cluster_id, RedirectPolicy::Forward, Some(name), true) => { + (_, RedirectPolicy::Forward, Some(name), true) => { let location = format!("{proto}://{host}{port}{path}"); - self.context.cluster_id = cluster_id; self.set_answer(DefaultAnswer::AnswerCustom { name, location }); Err(RetrieveClusterError::Redirected) } @@ -1390,17 +1387,14 @@ impl Http { - self.context.cluster_id = cluster_id; + _ => { self.set_answer(DefaultAnswer::Answer401 { www_authenticate }); Err(RetrieveClusterError::UnauthorizedRoute) } @@ -1956,7 +1950,7 @@ impl Http SessionState for Http { +impl SessionState for Http { fn ready( &mut self, session: Rc>, diff --git a/lib/src/protocol/pipe.rs b/lib/src/protocol/pipe.rs index 63d9dffc0..cb1c7349c 100644 --- a/lib/src/protocol/pipe.rs +++ b/lib/src/protocol/pipe.rs @@ -4,7 +4,7 @@ use mio::{net::TcpStream, Token}; use rusty_ulid::Ulid; use sozu_command::{ config::MAX_LOOP_ITERATIONS, - logging::{EndpointRecord, LogContext}, + logging::{CachedTags, EndpointRecord, LogContext}, }; use crate::{ @@ -14,7 +14,7 @@ use crate::{ socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol}, sozu_command::ready::Ready, timer::TimeoutContainer, - L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult, + L7Proxy, Protocol, Readiness, SessionMetrics, SessionResult, StateResult, }; /// This macro is defined uniquely in this module to help the tracking of pipelining @@ -59,7 +59,7 @@ pub enum WebSocketContext { Tcp, } -pub struct Pipe { +pub struct Pipe { backend_buffer: Checkout, backend_id: Option, pub backend_readiness: Readiness, @@ -75,14 +75,14 @@ pub struct Pipe { frontend_status: ConnectionStatus, frontend_token: Token, frontend: Front, - listener: Rc>, protocol: Protocol, request_id: Ulid, session_address: Option, + tags: Option>, websocket_context: WebSocketContext, } -impl Pipe { +impl Pipe { /// Instantiate a new Pipe SessionState with: /// /// - frontend_interest: READABLE | WRITABLE | HUP | ERROR @@ -103,12 +103,12 @@ impl Pipe { frontend_buffer: Checkout, frontend_token: Token, frontend: Front, - listener: Rc>, protocol: Protocol, request_id: Ulid, session_address: Option, websocket_context: WebSocketContext, - ) -> Pipe { + tags: Option>, + ) -> Pipe { let frontend_status = ConnectionStatus::Normal; let backend_status = if backend_socket.is_none() { ConnectionStatus::Closed @@ -138,10 +138,10 @@ impl Pipe { frontend_status, frontend_token, frontend, - listener, protocol, request_id, session_address, + tags, websocket_context, }; @@ -232,7 +232,6 @@ impl Pipe { } pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) { - let listener = self.listener.borrow(); let context = self.log_context(); let endpoint = self.log_endpoint(); metrics.register_end_of_session(&context); @@ -245,7 +244,7 @@ impl Pipe { backend_address: self.get_backend_address(), protocol: self.protocol_string(), endpoint, - tags: listener.get_tags(&listener.get_addr().to_string()), + tags: self.tags.as_deref(), client_rtt: socket_rtt(self.front_socket()), server_rtt: self.backend_socket.as_ref().and_then(socket_rtt), service_time: metrics.service_time(), @@ -657,7 +656,7 @@ impl Pipe { } } -impl SessionState for Pipe { +impl SessionState for Pipe { fn ready( &mut self, _session: Rc>, diff --git a/lib/src/protocol/proxy_protocol/expect.rs b/lib/src/protocol/proxy_protocol/expect.rs index 031d1cdf9..fffab0fd3 100644 --- a/lib/src/protocol/proxy_protocol/expect.rs +++ b/lib/src/protocol/proxy_protocol/expect.rs @@ -3,7 +3,10 @@ use std::{cell::RefCell, rc::Rc}; use mio::{net::TcpStream, *}; use nom::{Err, HexDisplay}; use rusty_ulid::Ulid; -use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext}; +use sozu_command::{ + config::MAX_LOOP_ITERATIONS, + logging::{CachedTags, LogContext}, +}; use crate::{ pool::Checkout, @@ -13,7 +16,6 @@ use crate::{ }, socket::{SocketHandler, SocketResult}, sozu_command::ready::Ready, - tcp::TcpListener, timer::TimeoutContainer, Protocol, Readiness, SessionMetrics, StateResult, }; @@ -166,8 +168,8 @@ impl ExpectProxyProtocol { back_buf: Checkout, backend_socket: Option, backend_token: Option, - listener: Rc>, - ) -> Pipe { + tags: Option>, + ) -> Pipe { let addr = self.front_socket().peer_addr().ok(); let mut pipe = Pipe::new( @@ -181,11 +183,11 @@ impl ExpectProxyProtocol { front_buf, self.frontend_token, self.frontend, - listener, Protocol::TCP, self.request_id, addr, WebSocketContext::Tcp, + tags, ); pipe.frontend_readiness.event = self.frontend_readiness.event; diff --git a/lib/src/protocol/proxy_protocol/relay.rs b/lib/src/protocol/proxy_protocol/relay.rs index 594ea3819..c2705c746 100644 --- a/lib/src/protocol/proxy_protocol/relay.rs +++ b/lib/src/protocol/proxy_protocol/relay.rs @@ -1,8 +1,9 @@ -use std::{cell::RefCell, io::Write, rc::Rc}; +use std::{io::Write, rc::Rc}; use mio::{net::TcpStream, Token}; use nom::{Err, Offset}; use rusty_ulid::Ulid; +use sozu_command::logging::CachedTags; use crate::{ pool::Checkout, @@ -12,7 +13,6 @@ use crate::{ }, socket::{SocketHandler, SocketResult}, sozu_command::ready::Ready, - tcp::TcpListener, Protocol, Readiness, SessionMetrics, SessionResult, }; @@ -172,11 +172,7 @@ impl RelayProxyProtocol { self.backend_token = Some(token); } - pub fn into_pipe( - mut self, - back_buf: Checkout, - listener: Rc>, - ) -> Pipe { + pub fn into_pipe(mut self, back_buf: Checkout, tags: Option>) -> Pipe { let backend_socket = self.backend.take().unwrap(); let addr = self.front_socket().peer_addr().ok(); @@ -191,11 +187,11 @@ impl RelayProxyProtocol { self.frontend_buffer, self.frontend_token, self.frontend, - listener, Protocol::TCP, self.request_id, addr, WebSocketContext::Tcp, + tags, ); pipe.frontend_readiness.event = self.frontend_readiness.event; diff --git a/lib/src/protocol/proxy_protocol/send.rs b/lib/src/protocol/proxy_protocol/send.rs index 061982509..7ed666f65 100644 --- a/lib/src/protocol/proxy_protocol/send.rs +++ b/lib/src/protocol/proxy_protocol/send.rs @@ -1,11 +1,11 @@ use std::{ - cell::RefCell, io::{ErrorKind, Write}, rc::Rc, }; use mio::{net::TcpStream, Token}; use rusty_ulid::Ulid; +use sozu_command::logging::CachedTags; use crate::{ pool::Checkout, @@ -15,7 +15,6 @@ use crate::{ }, socket::SocketHandler, sozu_command::ready::Ready, - tcp::TcpListener, BackendConnectionStatus, Protocol, Readiness, SessionMetrics, SessionResult, }; @@ -156,8 +155,8 @@ impl SendProxyProtocol { mut self, front_buf: Checkout, back_buf: Checkout, - listener: Rc>, - ) -> Pipe { + tags: Option>, + ) -> Pipe { let backend_socket = self.backend.take().unwrap(); let addr = self.front_socket().peer_addr().ok(); @@ -172,11 +171,11 @@ impl SendProxyProtocol { front_buf, self.frontend_token, self.frontend, - listener, Protocol::TCP, self.request_id, addr, WebSocketContext::Tcp, + tags, ); pipe.frontend_readiness = self.frontend_readiness; diff --git a/lib/src/router/mod.rs b/lib/src/router/mod.rs index b64b0f6eb..0e8ce2a29 100644 --- a/lib/src/router/mod.rs +++ b/lib/src/router/mod.rs @@ -12,9 +12,10 @@ use pattern_trie::{TrieMatches, TrieSubMatch}; use regex::bytes::Regex; use sozu_command::{ + logging::CachedTags, proto::command::{ - Header, PathRule as CommandPathRule, PathRuleKind, Position, RedirectPolicy, - RedirectScheme, RulePosition, + PathRule as CommandPathRule, PathRuleKind, Position, RedirectPolicy, RedirectScheme, + RulePosition, }, response::HttpFrontend, state::ClusterId, @@ -45,9 +46,9 @@ pub enum RouterError { } pub struct Router { - pre: Vec<(DomainRule, PathRule, MethodRule, Route)>, - pub tree: TrieNode>, - post: Vec<(DomainRule, PathRule, MethodRule, Route)>, + pre: Vec<(DomainRule, PathRule, MethodRule, Frontend)>, + pub tree: TrieNode>, + post: Vec<(DomainRule, PathRule, MethodRule, Frontend)>, } impl Default for Router { @@ -169,19 +170,7 @@ impl Router { let method_rule = MethodRule::new(front.method.clone()); - let route = Route::new( - front.cluster_id.clone(), - &domain_rule, - &path_rule, - front.required_auth, - front.redirect, - front.redirect_scheme, - front.redirect_template.clone(), - front.rewrite_host.clone(), - front.rewrite_path.clone(), - front.rewrite_port, - &front.headers, - )?; + let route = Frontend::new(&domain_rule, &path_rule, front)?; let success = match front.position { RulePosition::Pre => self.add_pre_rule(&domain_rule, &path_rule, &method_rule, &route), @@ -236,7 +225,7 @@ impl Router { hostname: &[u8], path: &PathRule, method: &MethodRule, - cluster: &Route, + cluster: &Frontend, ) -> bool { let hostname = match from_utf8(hostname) { Err(_) => return false, @@ -313,7 +302,7 @@ impl Router { domain: &DomainRule, path: &PathRule, method: &MethodRule, - cluster_id: &Route, + cluster_id: &Frontend, ) -> bool { if !self .pre @@ -337,7 +326,7 @@ impl Router { domain: &DomainRule, path: &PathRule, method: &MethodRule, - cluster_id: &Route, + cluster_id: &Frontend, ) -> bool { if !self .post @@ -741,7 +730,7 @@ impl Debug for HeaderEdit { /// What to do with the traffic /// TODO: tags should be moved here #[derive(Debug, Clone)] -pub struct Route { +pub struct Frontend { cluster_id: Option, required_auth: bool, redirect: RedirectPolicy, @@ -754,22 +743,29 @@ pub struct Route { rewrite_port: Option, headers_request: Rc<[HeaderEdit]>, headers_response: Rc<[HeaderEdit]>, + tags: Option>, } -impl Route { +impl Frontend { pub fn new( - cluster_id: Option, domain_rule: &DomainRule, path_rule: &PathRule, - required_auth: bool, - redirect: RedirectPolicy, - redirect_scheme: RedirectScheme, - redirect_template: Option, - rewrite_host: Option, - rewrite_path: Option, - rewrite_port: Option, - headers: &[Header], + front: &HttpFrontend, ) -> Result { + let cluster_id = front.cluster_id.clone(); + let required_auth = front.required_auth; + let rewrite_port = front.rewrite_port; + let rewrite_path = front.rewrite_path.clone(); + let rewrite_host = front.rewrite_host.clone(); + let redirect = front.redirect; + let redirect_scheme = front.redirect_scheme; + let redirect_template = front.redirect_template.clone(); + let headers = &front.headers; + let tags = front + .tags + .clone() + .map(|tags| Rc::new(CachedTags::new(tags))); + let deny = match (&cluster_id, redirect, &redirect_template, required_auth) { (_, RedirectPolicy::Unauthorized, _, false) => true, (_, RedirectPolicy::Unauthorized, _, true) => { @@ -791,7 +787,7 @@ impl Route { }; if deny { return Ok(Self { - cluster_id, + cluster_id: cluster_id.clone(), required_auth, redirect: RedirectPolicy::Unauthorized, redirect_scheme, @@ -803,6 +799,7 @@ impl Route { rewrite_port: None, headers_request: Rc::new([]), headers_response: Rc::new([]), + tags: None, }); } let mut capture_cap_host = match domain_rule { @@ -867,7 +864,7 @@ impl Route { }); } } - Ok(Route { + Ok(Frontend { cluster_id, required_auth, redirect, @@ -880,6 +877,7 @@ impl Route { rewrite_port, headers_request: headers_request.into(), headers_response: headers_response.into(), + tags, }) } @@ -898,11 +896,12 @@ impl Route { rewrite_port: None, headers_request: Rc::new([]), headers_response: Rc::new([]), + tags: None, } } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct RouteResult { pub cluster_id: Option, pub required_auth: bool, @@ -914,6 +913,7 @@ pub struct RouteResult { pub rewritten_port: Option, pub headers_request: Rc<[HeaderEdit]>, pub headers_response: Rc<[HeaderEdit]>, + pub tags: Option>, } impl RouteResult { @@ -929,22 +929,7 @@ impl RouteResult { rewritten_port: None, headers_request: Rc::new([]), headers_response: Rc::new([]), - } - } - - #[cfg(test)] - pub fn forward(cluster_id: ClusterId) -> Self { - Self { - cluster_id: Some(cluster_id), - required_auth: false, - redirect: RedirectPolicy::Forward, - redirect_scheme: RedirectScheme::UseSame, - redirect_template: None, - rewritten_host: None, - rewritten_path: None, - rewritten_port: None, - headers_request: Rc::new([]), - headers_response: Rc::new([]), + tags: None, } } @@ -952,9 +937,9 @@ impl RouteResult { captures_host: Vec<&'a str>, path: &'a [u8], path_rule: &PathRule, - route: &Route, + route: &Frontend, ) -> Self { - let Route { + let Frontend { cluster_id, required_auth, redirect, @@ -966,6 +951,7 @@ impl RouteResult { rewrite_port, headers_request, headers_response, + tags, .. } = route; let mut captures_path = Vec::with_capacity(*capture_cap_path); @@ -1002,6 +988,7 @@ impl RouteResult { rewritten_port: *rewrite_port, headers_request: headers_request.clone(), headers_response: headers_response.clone(), + tags: tags.clone(), } } fn new_no_trie<'a>( @@ -1009,9 +996,9 @@ impl RouteResult { domain_rule: &DomainRule, path: &'a [u8], path_rule: &PathRule, - route: &Route, + route: &Frontend, ) -> Self { - let Route { + let Frontend { cluster_id, redirect, capture_cap_host, @@ -1044,9 +1031,9 @@ impl RouteResult { domain_submatches: TrieMatches<'_, 'a>, path: &'a [u8], path_rule: &PathRule, - route: &Route, + route: &Frontend, ) -> Self { - let Route { + let Frontend { cluster_id, redirect, capture_cap_host, @@ -1194,27 +1181,33 @@ mod tests { b"*.sozu.io", &PathRule::Prefix("".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("base".to_string()) + &Frontend::forward("base".to_string()) )); println!("{:#?}", router.tree); assert_eq!( - router.lookup("www.sozu.io", "/api", &Method::Get), - Ok(RouteResult::forward("base".to_string())) + router + .lookup("www.sozu.io", "/api", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("base".to_string())) ); assert!(router.add_tree_rule( b"*.sozu.io", &PathRule::Prefix("/api".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("api".to_string()) + &Frontend::forward("api".to_string()) )); println!("{:#?}", router.tree); assert_eq!( - router.lookup("www.sozu.io", "/ap", &Method::Get), - Ok(RouteResult::forward("base".to_string())) + router + .lookup("www.sozu.io", "/ap", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("base".to_string())) ); assert_eq!( - router.lookup("www.sozu.io", "/api", &Method::Get), - Ok(RouteResult::forward("api".to_string())) + router + .lookup("www.sozu.io", "/api", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("api".to_string())) ); } @@ -1233,27 +1226,33 @@ mod tests { b"*.sozu.io", &PathRule::Prefix("".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("base".to_string()) + &Frontend::forward("base".to_string()) )); println!("{:#?}", router.tree); assert_eq!( - router.lookup("www.sozu.io", "/api", &Method::Get), - Ok(RouteResult::forward("base".to_string())) + router + .lookup("www.sozu.io", "/api", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("base".to_string())) ); assert!(router.add_tree_rule( b"api.sozu.io", &PathRule::Prefix("".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("api".to_string()) + &Frontend::forward("api".to_string()) )); println!("{:#?}", router.tree); assert_eq!( - router.lookup("www.sozu.io", "/api", &Method::Get), - Ok(RouteResult::forward("base".to_string())) + router + .lookup("www.sozu.io", "/api", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("base".to_string())) ); assert_eq!( - router.lookup("api.sozu.io", "/api", &Method::Get), - Ok(RouteResult::forward("api".to_string())) + router + .lookup("api.sozu.io", "/api", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("api".to_string())) ); } @@ -1265,23 +1264,27 @@ mod tests { b"www./.*/.io", &PathRule::Prefix("".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("base".to_string()) + &Frontend::forward("base".to_string()) )); println!("{:#?}", router.tree); assert!(router.add_tree_rule( b"www.doc./.*/.io", &PathRule::Prefix("".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("doc".to_string()) + &Frontend::forward("doc".to_string()) )); println!("{:#?}", router.tree); assert_eq!( - router.lookup("www.sozu.io", "/", &Method::Get), - Ok(RouteResult::forward("base".to_string())) + router + .lookup("www.sozu.io", "/", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("base".to_string())) ); assert_eq!( - router.lookup("www.doc.sozu.io", "/", &Method::Get), - Ok(RouteResult::forward("doc".to_string())) + router + .lookup("www.doc.sozu.io", "/", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("doc".to_string())) ); assert!(router.remove_tree_rule( b"www./.*/.io", @@ -1291,8 +1294,10 @@ mod tests { println!("{:#?}", router.tree); assert!(router.lookup("www.sozu.io", "/", &Method::Get).is_err()); assert_eq!( - router.lookup("www.doc.sozu.io", "/", &Method::Get), - Ok(RouteResult::forward("doc".to_string())) + router + .lookup("www.doc.sozu.io", "/", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("doc".to_string())) ); } @@ -1304,53 +1309,57 @@ mod tests { &"*".parse::().unwrap(), &PathRule::Prefix("/.well-known/acme-challenge".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("acme".to_string()) + &Frontend::forward("acme".to_string()) )); assert!(router.add_tree_rule( "www.example.com".as_bytes(), &PathRule::Prefix("/".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("example".to_string()) + &Frontend::forward("example".to_string()) )); assert!(router.add_tree_rule( "*.test.example.com".as_bytes(), &PathRule::Regex(Regex::new("/hello[A-Z]+/").unwrap()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("examplewildcard".to_string()) + &Frontend::forward("examplewildcard".to_string()) )); assert!(router.add_tree_rule( "/test[0-9]/.example.com".as_bytes(), &PathRule::Prefix("/".to_string()), &MethodRule::new(Some("GET".to_string())), - &Route::forward("exampleregex".to_string()) + &Frontend::forward("exampleregex".to_string()) )); assert_eq!( - router.lookup("www.example.com", "/helloA", &Method::new(&b"GET"[..])), - Ok(RouteResult::forward("example".to_string())) + router + .lookup("www.example.com", "/helloA", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("example".to_string())) ); assert_eq!( - router.lookup( - "www.example.com", - "/.well-known/acme-challenge", - &Method::new(&b"GET"[..]) - ), - Ok(RouteResult::forward("acme".to_string())) + router + .lookup( + "www.example.com", + "/.well-known/acme-challenge", + &Method::Get + ) + .map(|r| r.cluster_id), + Ok(Some("acme".to_string())) ); assert!(router - .lookup("www.test.example.com", "/", &Method::new(&b"GET"[..])) + .lookup("www.test.example.com", "/", &Method::Get) .is_err()); assert_eq!( - router.lookup( - "www.test.example.com", - "/helloAB/", - &Method::new(&b"GET"[..]) - ), - Ok(RouteResult::forward("examplewildcard".to_string())) + router + .lookup("www.test.example.com", "/helloAB/", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("examplewildcard".to_string())) ); assert_eq!( - router.lookup("test1.example.com", "/helloAB/", &Method::new(&b"GET"[..])), - Ok(RouteResult::forward("exampleregex".to_string())) + router + .lookup("test1.example.com", "/helloAB/", &Method::Get) + .map(|r| r.cluster_id), + Ok(Some("exampleregex".to_string())) ); } } diff --git a/lib/src/tcp.rs b/lib/src/tcp.rs index 2a44341a3..90807f273 100644 --- a/lib/src/tcp.rs +++ b/lib/src/tcp.rs @@ -44,7 +44,7 @@ use crate::{ }, timer::TimeoutContainer, AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags, - ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, + L4ListenerHandler, ListenerError, Protocol, ProxyConfiguration, ProxyError, ProxySession, Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder, }; @@ -54,7 +54,7 @@ StateMachineBuilder! { /// 1. optional (ExpectProxyProtocol | SendProxyProtocol | RelayProxyProtocol) /// 2. Pipe enum TcpStateMachine { - Pipe(Pipe), + Pipe(Pipe), SendProxyProtocol(SendProxyProtocol), RelayProxyProtocol(RelayProxyProtocol), ExpectProxyProtocol(ExpectProxyProtocol), @@ -91,13 +91,14 @@ pub struct TcpSession { frontend_address: Option, frontend_buffer: Option, frontend_token: Token, - has_been_closed: SessionIsToBeClosed, + has_been_closed: bool, last_event: Instant, listener: Rc>, metrics: SessionMetrics, proxy: Rc>, request_id: Ulid, state: TcpStateMachine, + tags: Option>, } impl TcpSession { @@ -127,6 +128,8 @@ impl TcpSession { TimeoutContainer::new(configured_frontend_timeout, frontend_token); let container_backend_timeout = TimeoutContainer::new_empty(configured_connect_timeout); + let tags = listener.borrow().tags.clone(); + let state = match proxy_protocol { Some(ProxyProtocolConfig::RelayHeader) => { backend_buffer_session = Some(backend_buffer); @@ -174,11 +177,11 @@ impl TcpSession { frontend_buffer, frontend_token, socket, - listener.clone(), Protocol::TCP, request_id, frontend_address, WebSocketContext::Tcp, + tags.clone(), ); pipe.set_cluster_id(cluster_id.clone()); TcpStateMachine::Pipe(pipe) @@ -209,11 +212,11 @@ impl TcpSession { proxy, request_id, state, + tags, } } fn log_request(&self) { - let listener = self.listener.borrow(); let context = self.log_context(); self.metrics.register_end_of_session(&context); info_access!( @@ -224,7 +227,7 @@ impl TcpSession { backend_address: None, protocol: "TCP", endpoint: EndpointRecord::Tcp, - tags: listener.get_tags(&listener.get_addr().to_string()), + tags: self.tags.as_deref(), client_rtt: socket_rtt(self.state.front_socket()), server_rtt: None, user_agent: None, @@ -364,7 +367,7 @@ impl TcpSession { let mut pipe = send_proxy_protocol.into_pipe( self.frontend_buffer.take().unwrap(), self.backend_buffer.take().unwrap(), - self.listener.clone(), + self.tags.clone(), ); pipe.set_cluster_id(self.cluster_id.clone()); @@ -382,8 +385,7 @@ impl TcpSession { fn upgrade_relay(&mut self, rpp: RelayProxyProtocol) -> Option { if self.backend_buffer.is_some() { - let mut pipe = - rpp.into_pipe(self.backend_buffer.take().unwrap(), self.listener.clone()); + let mut pipe = rpp.into_pipe(self.backend_buffer.take().unwrap(), self.tags.clone()); pipe.set_cluster_id(self.cluster_id.clone()); gauge_add!("protocol.proxy.relay", -1); gauge_add!("protocol.tcp", 1); @@ -407,7 +409,7 @@ impl TcpSession { self.backend_buffer.take().unwrap(), None, None, - self.listener.clone(), + self.tags.clone(), ); pipe.set_cluster_id(self.cluster_id.clone()); @@ -1089,29 +1091,22 @@ impl ProxySession for TcpSession { } pub struct TcpListener { - active: SessionIsToBeClosed, + active: bool, address: SocketAddr, cluster_id: Option, config: TcpListenerConfig, listener: Option, - tags: BTreeMap, + pub tags: Option>, token: Token, } -impl ListenerHandler for TcpListener { - fn get_addr(&self) -> &SocketAddr { - &self.address - } - - fn get_tags(&self, key: &str) -> Option<&CachedTags> { - self.tags.get(key) +impl L4ListenerHandler for TcpListener { + fn get_tags(&self) -> Option<&CachedTags> { + self.tags.as_deref() } - fn set_tags(&mut self, key: String, tags: Option>) { - match tags { - Some(tags) => self.tags.insert(key, CachedTags::new(tags)), - None => self.tags.remove(&key), - }; + fn set_tags(&mut self, tags: Option>) { + self.tags = tags.map(|tags| Rc::new(CachedTags::new(tags))) } } @@ -1124,7 +1119,7 @@ impl TcpListener { address: config.address.into(), config, active: false, - tags: BTreeMap::new(), + tags: None, }) } @@ -1224,7 +1219,7 @@ impl TcpProxy { } } - pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed { + pub fn remove_listener(&mut self, address: SocketAddr) -> bool { let len = self.listeners.len(); self.listeners.retain(|_, l| l.borrow().address != address); @@ -1291,7 +1286,7 @@ impl TcpProxy { self.fronts .insert(front.cluster_id.to_string(), listener.token); - listener.set_tags(address.to_string(), Some(front.tags)); + listener.set_tags(Some(front.tags)); listener.cluster_id = Some(front.cluster_id); Ok(()) } @@ -1308,7 +1303,7 @@ impl TcpProxy { None => return Err(ProxyError::NoListenerFound(address)), }; - listener.set_tags(address.to_string(), None); + listener.set_tags(None); if let Some(cluster_id) = listener.cluster_id.take() { self.fronts.remove(&cluster_id); }