diff --git a/Cargo.lock b/Cargo.lock index fadb0c1e..14a9fefe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -377,7 +377,7 @@ dependencies = [ "mimalloc", "pem", "percent-encoding", - "pin-project", + "pin-project-lite", "pkcs8", "pyo3", "pyo3-build-config", @@ -710,26 +710,6 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" -[[package]] -name = "pin-project" -version = "1.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfe2e71e1471fe07709406bf725f710b02927c9c54b2b5b2ec0e8087d97c327d" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6e859e6e5bd50440ab63c47e3ebabc90f26251f7c73c3d3e837b74a1cc3fa67" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.16" diff --git a/Cargo.toml b/Cargo.toml index 54e7dc14..d7a8b0c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ log = "0.4" mimalloc = { version = "0.1.43", default-features = false, features = ["local_dynamic_tls"], optional = true } pem = "=3.0" percent-encoding = "=2.3" -pin-project = "1.1" +pin-project-lite = "=0.2" pkcs8 = { version = "=0.10", features = ["encryption", "pkcs5"] } pyo3 = { version = "=0.23", features = ["anyhow", "extension-module", "generate-import-lib"] } pyo3-log = "=0.12" diff --git a/README.md b/README.md index 5ec4ecb0..032c01b0 100644 --- a/README.md +++ b/README.md @@ -141,10 +141,15 @@ Options: GRANIAN_BACKPRESSURE; default: (backlog/workers); x>=1] --http1-buffer-size INTEGER RANGE - Set the maximum buffer size for HTTP/1 + Sets the maximum buffer size for HTTP/1 connections [env var: GRANIAN_HTTP1_BUFFER_SIZE; default: 417792; x>=8192] + --http1-header-read-timeout INTEGER RANGE + Sets a timeout (in milliseconds) to read + headers [env var: + GRANIAN_HTTP1_HEADER_READ_TIMEOUT; default: + 30000; 1<=x<=60000] --http1-keep-alive / --no-http1-keep-alive Enables or disables HTTP/1 keep-alive [env var: GRANIAN_HTTP1_KEEP_ALIVE; default: @@ -159,42 +164,46 @@ Options: for HTTP2 [env var: GRANIAN_HTTP2_ADAPTIVE_WINDOW; default: (disabled)] - --http2-initial-connection-window-size INTEGER + --http2-initial-connection-window-size INTEGER RANGE Sets the max connection-level flow control for HTTP2 [env var: GRANIAN_HTTP2_INITIAL_C - ONNECTION_WINDOW_SIZE; default: 1048576] - --http2-initial-stream-window-size INTEGER + ONNECTION_WINDOW_SIZE; default: 1048576; + x>=1024] + --http2-initial-stream-window-size INTEGER RANGE Sets the `SETTINGS_INITIAL_WINDOW_SIZE` option for HTTP2 stream-level flow control [env var: GRANIAN_HTTP2_INITIAL_STREAM_WINDOW_SIZE; - default: 1048576] - --http2-keep-alive-interval INTEGER - Sets an interval for HTTP2 Ping frames - should be sent to keep a connection alive - [env var: GRANIAN_HTTP2_KEEP_ALIVE_INTERVAL] - --http2-keep-alive-timeout INTEGER - Sets a timeout for receiving an + default: 1048576; x>=1024] + --http2-keep-alive-interval INTEGER RANGE + Sets an interval (in milliseconds) for HTTP2 + Ping frames should be sent to keep a + connection alive [env var: + GRANIAN_HTTP2_KEEP_ALIVE_INTERVAL; + 1<=x<=60000] + --http2-keep-alive-timeout INTEGER RANGE + Sets a timeout (in seconds) for receiving an acknowledgement of the HTTP2 keep-alive ping [env var: GRANIAN_HTTP2_KEEP_ALIVE_TIMEOUT; - default: 20] - --http2-max-concurrent-streams INTEGER + default: 20; x>=1] + --http2-max-concurrent-streams INTEGER RANGE Sets the SETTINGS_MAX_CONCURRENT_STREAMS option for HTTP2 connections [env var: GRANIAN_HTTP2_MAX_CONCURRENT_STREAMS; - default: 200] - --http2-max-frame-size INTEGER Sets the maximum frame size to use for HTTP2 + default: 200; x>=10] + --http2-max-frame-size INTEGER RANGE + Sets the maximum frame size to use for HTTP2 [env var: GRANIAN_HTTP2_MAX_FRAME_SIZE; - default: 16384] - --http2-max-headers-size INTEGER + default: 16384; x>=1024] + --http2-max-headers-size INTEGER RANGE Sets the max size of received header frames [env var: GRANIAN_HTTP2_MAX_HEADERS_SIZE; - default: 16777216] - --http2-max-send-buffer-size INTEGER + default: 16777216; x>=1] + --http2-max-send-buffer-size INTEGER RANGE Set the maximum write buffer size for each HTTP/2 stream [env var: GRANIAN_HTTP2_MAX_SEND_BUFFER_SIZE; default: - 409600] + 409600; x>=1024] --log / --no-log Enable logging [env var: GRANIAN_LOG_ENABLED; default: (enabled)] --log-level [critical|error|warning|warn|info|debug|notset] diff --git a/granian/cli.py b/granian/cli.py index 9f8ca544..12dbfcd7 100644 --- a/granian/cli.py +++ b/granian/cli.py @@ -110,7 +110,13 @@ def option(*param_decls: str, cls: Optional[Type[click.Option]] = None, **attrs: '--http1-buffer-size', type=click.IntRange(8192), default=HTTP1Settings.max_buffer_size, - help='Set the maximum buffer size for HTTP/1 connections', + help='Sets the maximum buffer size for HTTP/1 connections', +) +@option( + '--http1-header-read-timeout', + type=click.IntRange(1, 60_000), + default=HTTP1Settings.header_read_timeout, + help='Sets a timeout (in milliseconds) to read headers', ) @option( '--http1-keep-alive/--no-http1-keep-alive', @@ -129,49 +135,49 @@ def option(*param_decls: str, cls: Optional[Type[click.Option]] = None, **attrs: ) @option( '--http2-initial-connection-window-size', - type=int, + type=click.IntRange(1024), default=HTTP2Settings.initial_connection_window_size, help='Sets the max connection-level flow control for HTTP2', ) @option( '--http2-initial-stream-window-size', - type=int, + type=click.IntRange(1024), default=HTTP2Settings.initial_stream_window_size, help='Sets the `SETTINGS_INITIAL_WINDOW_SIZE` option for HTTP2 stream-level flow control', ) @option( '--http2-keep-alive-interval', - type=int, + type=click.IntRange(1, 60_000), default=HTTP2Settings.keep_alive_interval, - help='Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive', + help='Sets an interval (in milliseconds) for HTTP2 Ping frames should be sent to keep a connection alive', ) @option( '--http2-keep-alive-timeout', - type=int, + type=click.IntRange(1), default=HTTP2Settings.keep_alive_timeout, - help='Sets a timeout for receiving an acknowledgement of the HTTP2 keep-alive ping', + help='Sets a timeout (in seconds) for receiving an acknowledgement of the HTTP2 keep-alive ping', ) @option( '--http2-max-concurrent-streams', - type=int, + type=click.IntRange(10), default=HTTP2Settings.max_concurrent_streams, help='Sets the SETTINGS_MAX_CONCURRENT_STREAMS option for HTTP2 connections', ) @option( '--http2-max-frame-size', - type=int, + type=click.IntRange(1024), default=HTTP2Settings.max_frame_size, help='Sets the maximum frame size to use for HTTP2', ) @option( '--http2-max-headers-size', - type=int, + type=click.IntRange(1), default=HTTP2Settings.max_headers_size, help='Sets the max size of received header frames', ) @option( '--http2-max-send-buffer-size', - type=int, + type=click.IntRange(1024), default=HTTP2Settings.max_send_buffer_size, help='Set the maximum write buffer size for each HTTP/2 stream', ) @@ -284,6 +290,7 @@ def cli( backlog: int, backpressure: Optional[int], http1_buffer_size: int, + http1_header_read_timeout: int, http1_keep_alive: bool, http1_pipeline_flush: bool, http2_adaptive_window: bool, @@ -344,7 +351,10 @@ def cli( backlog=backlog, backpressure=backpressure, http1_settings=HTTP1Settings( - keep_alive=http1_keep_alive, max_buffer_size=http1_buffer_size, pipeline_flush=http1_pipeline_flush + header_read_timeout=http1_header_read_timeout, + keep_alive=http1_keep_alive, + max_buffer_size=http1_buffer_size, + pipeline_flush=http1_pipeline_flush, ), http2_settings=HTTP2Settings( adaptive_window=http2_adaptive_window, diff --git a/granian/http.py b/granian/http.py index 0e3d37b1..09fd2695 100644 --- a/granian/http.py +++ b/granian/http.py @@ -4,6 +4,7 @@ @dataclass class HTTP1Settings: + header_read_timeout: int = 30_000 keep_alive: bool = True max_buffer_size: int = 8192 + 4096 * 100 pipeline_flush: bool = False diff --git a/src/conversion.rs b/src/conversion.rs index aceed557..249697ea 100644 --- a/src/conversion.rs +++ b/src/conversion.rs @@ -56,11 +56,16 @@ impl<'p> IntoPyObject<'p> for FutureResultToPy { pub(crate) fn worker_http1_config_from_py(py: Python, cfg: Option) -> PyResult { let ret = match cfg { Some(cfg) => HTTP1Config { + header_read_timeout: cfg + .getattr(py, "header_read_timeout")? + .extract(py) + .map(core::time::Duration::from_millis)?, keep_alive: cfg.getattr(py, "keep_alive")?.extract(py)?, max_buffer_size: cfg.getattr(py, "max_buffer_size")?.extract(py)?, pipeline_flush: cfg.getattr(py, "pipeline_flush")?.extract(py)?, }, None => HTTP1Config { + header_read_timeout: core::time::Duration::from_secs(30), keep_alive: true, max_buffer_size: 8192 + 4096 * 100, pipeline_flush: false, @@ -75,10 +80,11 @@ pub(crate) fn worker_http2_config_from_py(py: Python, cfg: Option) -> adaptive_window: cfg.getattr(py, "adaptive_window")?.extract(py)?, initial_connection_window_size: cfg.getattr(py, "initial_connection_window_size")?.extract(py)?, initial_stream_window_size: cfg.getattr(py, "initial_stream_window_size")?.extract(py)?, - keep_alive_interval: match cfg.getattr(py, "keep_alive_interval")?.extract(py) { - Ok(v) => Some(core::time::Duration::from_secs(v)), - _ => None, - }, + keep_alive_interval: cfg + .getattr(py, "keep_alive_interval")? + .extract(py) + .ok() + .map(core::time::Duration::from_millis), keep_alive_timeout: core::time::Duration::from_secs(cfg.getattr(py, "keep_alive_timeout")?.extract(py)?), max_concurrent_streams: cfg.getattr(py, "max_concurrent_streams")?.extract(py)?, max_frame_size: cfg.getattr(py, "max_frame_size")?.extract(py)?, diff --git a/src/io.rs b/src/io.rs index a8ac6ce2..4c9628ad 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,10 +1,14 @@ use std::{ + future::Future, marker::{PhantomData, Unpin}, pin::Pin, task::{Context, Poll}, + time::{Duration, Instant}, }; +use hyper::rt::{Sleep, Timer}; use hyper_util::rt::TokioIo; +use pin_project_lite::pin_project; pub(crate) struct IOTypeNotSend { _marker: PhantomData<*const ()>, @@ -49,3 +53,57 @@ where Pin::new(&mut self.stream).poll_read(cx, buf) } } + +#[derive(Clone, Debug)] +pub struct TokioTimer; + +impl Timer for TokioTimer { + fn sleep(&self, duration: Duration) -> Pin> { + Box::pin(TokioSleep { + inner: tokio::time::sleep(duration), + }) + } + + fn sleep_until(&self, deadline: Instant) -> Pin> { + Box::pin(TokioSleep { + inner: tokio::time::sleep_until(deadline.into()), + }) + } + + fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { + if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { + sleep.reset(new_deadline); + } + } +} + +impl TokioTimer { + pub fn new() -> Self { + Self {} + } +} + +// Use TokioSleep to get tokio::time::Sleep to implement Unpin. +// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html +pin_project! { + pub(crate) struct TokioSleep { + #[pin] + pub(crate) inner: tokio::time::Sleep, + } +} + +impl Future for TokioSleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) + } +} + +impl Sleep for TokioSleep {} + +impl TokioSleep { + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.project().inner.as_mut().reset(deadline.into()); + } +} diff --git a/src/workers.rs b/src/workers.rs index 2e0ae230..dfd81703 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -67,6 +67,7 @@ impl WorkerSignalSync { #[derive(Clone)] pub(crate) struct HTTP1Config { + pub header_read_timeout: core::time::Duration, pub keep_alive: bool, pub max_buffer_size: usize, pub pipeline_flush: bool, @@ -275,11 +276,14 @@ macro_rules! handle_connection_http1 { $spawner(async move { let svc = crate::workers::build_service!(local_addr, remote_addr, callback_wrapper, rth, $target, $proto); - let mut conn = hyper::server::conn::http1::Builder::new(); - conn.keep_alive($http_opts.keep_alive); - conn.max_buf_size($http_opts.max_buffer_size); - conn.pipeline_flush($http_opts.pipeline_flush); - let _ = conn.serve_connection($stream_wrapper(stream), svc).await; + let _ = hyper::server::conn::http1::Builder::new() + .timer(crate::io::TokioTimer::new()) + .header_read_timeout($http_opts.header_read_timeout) + .keep_alive($http_opts.keep_alive) + .max_buf_size($http_opts.max_buffer_size) + .pipeline_flush($http_opts.pipeline_flush) + .serve_connection($stream_wrapper(stream), svc) + .await; drop(permit); }); } @@ -294,11 +298,12 @@ macro_rules! handle_connection_http1_upgrades { $spawner(async move { let svc = crate::workers::build_service!(local_addr, remote_addr, callback_wrapper, rth, $target, $proto); - let mut conn = hyper::server::conn::http1::Builder::new(); - conn.keep_alive($http_opts.keep_alive); - conn.max_buf_size($http_opts.max_buffer_size); - conn.pipeline_flush($http_opts.pipeline_flush); - let _ = conn + let _ = hyper::server::conn::http1::Builder::new() + .timer(crate::io::TokioTimer::new()) + .header_read_timeout($http_opts.header_read_timeout) + .keep_alive($http_opts.keep_alive) + .max_buf_size($http_opts.max_buffer_size) + .pipeline_flush($http_opts.pipeline_flush) .serve_connection($stream_wrapper(stream), svc) .with_upgrades() .await; @@ -316,17 +321,19 @@ macro_rules! handle_connection_http2 { $spawner(async move { let svc = crate::workers::build_service!(local_addr, remote_addr, callback_wrapper, rth, $target, $proto); - let mut conn = hyper::server::conn::http2::Builder::new($executor_builder()); - conn.adaptive_window($http_opts.adaptive_window); - conn.initial_connection_window_size($http_opts.initial_connection_window_size); - conn.initial_stream_window_size($http_opts.initial_stream_window_size); - conn.keep_alive_interval($http_opts.keep_alive_interval); - conn.keep_alive_timeout($http_opts.keep_alive_timeout); - conn.max_concurrent_streams($http_opts.max_concurrent_streams); - conn.max_frame_size($http_opts.max_frame_size); - conn.max_header_list_size($http_opts.max_headers_size); - conn.max_send_buf_size($http_opts.max_send_buffer_size); - let _ = conn.serve_connection($stream_wrapper(stream), svc).await; + let _ = hyper::server::conn::http2::Builder::new($executor_builder()) + .timer(crate::io::TokioTimer::new()) + .adaptive_window($http_opts.adaptive_window) + .initial_connection_window_size($http_opts.initial_connection_window_size) + .initial_stream_window_size($http_opts.initial_stream_window_size) + .keep_alive_interval($http_opts.keep_alive_interval) + .keep_alive_timeout($http_opts.keep_alive_timeout) + .max_concurrent_streams($http_opts.max_concurrent_streams) + .max_frame_size($http_opts.max_frame_size) + .max_header_list_size($http_opts.max_headers_size) + .max_send_buf_size($http_opts.max_send_buffer_size) + .serve_connection($stream_wrapper(stream), svc) + .await; drop(permit); }); } @@ -342,20 +349,23 @@ macro_rules! handle_connection_httpa { let svc = crate::workers::build_service!(local_addr, remote_addr, callback_wrapper, rth, $target, $proto); let mut conn = hyper_util::server::conn::auto::Builder::new($executor_builder()); - conn.http1().keep_alive($http1_opts.keep_alive); - conn.http1().max_buf_size($http1_opts.max_buffer_size); - conn.http1().pipeline_flush($http1_opts.pipeline_flush); - conn.http2().adaptive_window($http2_opts.adaptive_window); - conn.http2() - .initial_connection_window_size($http2_opts.initial_connection_window_size); + conn.http1() + .timer(crate::io::TokioTimer::new()) + .header_read_timeout($http1_opts.header_read_timeout) + .keep_alive($http1_opts.keep_alive) + .max_buf_size($http1_opts.max_buffer_size) + .pipeline_flush($http1_opts.pipeline_flush); conn.http2() - .initial_stream_window_size($http2_opts.initial_stream_window_size); - conn.http2().keep_alive_interval($http2_opts.keep_alive_interval); - conn.http2().keep_alive_timeout($http2_opts.keep_alive_timeout); - conn.http2().max_concurrent_streams($http2_opts.max_concurrent_streams); - conn.http2().max_frame_size($http2_opts.max_frame_size); - conn.http2().max_header_list_size($http2_opts.max_headers_size); - conn.http2().max_send_buf_size($http2_opts.max_send_buffer_size); + .timer(crate::io::TokioTimer::new()) + .adaptive_window($http2_opts.adaptive_window) + .initial_connection_window_size($http2_opts.initial_connection_window_size) + .initial_stream_window_size($http2_opts.initial_stream_window_size) + .keep_alive_interval($http2_opts.keep_alive_interval) + .keep_alive_timeout($http2_opts.keep_alive_timeout) + .max_concurrent_streams($http2_opts.max_concurrent_streams) + .max_frame_size($http2_opts.max_frame_size) + .max_header_list_size($http2_opts.max_headers_size) + .max_send_buf_size($http2_opts.max_send_buffer_size); let _ = conn.$conn_method($stream_wrapper(stream), svc).await; drop(permit); }); diff --git a/src/ws.rs b/src/ws.rs index b291b457..27118ebb 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -4,7 +4,7 @@ use hyper::{ http::response::Builder, Request, Response, StatusCode, }; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -28,12 +28,13 @@ pub(crate) type WSStream = WebSocketStream; pub(crate) type WSTxStream = futures::stream::SplitSink; -#[pin_project] -#[derive(Debug)] -pub struct HyperWebsocket { - #[pin] - inner: hyper::upgrade::OnUpgrade, - config: Option, +pin_project! { + #[derive(Debug)] + pub struct HyperWebsocket { + #[pin] + inner: hyper::upgrade::OnUpgrade, + config: Option, + } } impl Future for HyperWebsocket {