From 571d443d2cad447ef74d14fda6f8fbe5def6d938 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Wed, 17 Jan 2024 14:15:50 +0100 Subject: [PATCH] Ensure `http.disconnect` event in ASGI protocol (#174) --- src/asgi/io.rs | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/src/asgi/io.rs b/src/asgi/io.rs index 7dcfbfb8..8dff6954 100644 --- a/src/asgi/io.rs +++ b/src/asgi/io.rs @@ -44,6 +44,8 @@ pub(crate) struct ASGIHTTPProtocol { response_status: Option, response_headers: Option, body_tx: Option>>, + flow_rx_exhausted: Arc>, + flow_tx_waiter: Arc, } impl ASGIHTTPProtocol { @@ -57,6 +59,8 @@ impl ASGIHTTPProtocol { response_status: None, response_headers: None, body_tx: None, + flow_rx_exhausted: Arc::new(std::sync::RwLock::new(false)), + flow_tx_waiter: Arc::new(tokio::sync::Notify::new()), } } @@ -96,7 +100,20 @@ impl ASGIHTTPProtocol { #[pymethods] impl ASGIHTTPProtocol { fn receive<'p>(&mut self, py: Python<'p>) -> PyResult<&'p PyAny> { + if *self.flow_rx_exhausted.read().unwrap() { + let holder = self.flow_tx_waiter.clone(); + return future_into_py_futlike(self.rt.clone(), py, async move { + let () = holder.notified().await; + Python::with_gil(|py| { + let dict = PyDict::new(py); + dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.disconnect"))?; + Ok(dict.to_object(py)) + }) + }); + } + let body_ref = self.request_body.clone(); + let flow_ref = self.flow_rx_exhausted.clone(); future_into_py_iter(self.rt.clone(), py, async move { let mut bodym = body_ref.lock().await; let body = &mut *bodym; @@ -110,6 +127,11 @@ impl ASGIHTTPProtocol { } _ => body::Bytes::new(), }; + if !more_body { + let mut flow = flow_ref.write().unwrap(); + *flow = true; + } + Python::with_gil(|py| { let dict = PyDict::new(py); dict.set_item(pyo3::intern!(py, "type"), pyo3::intern!(py, "http.request"))?; @@ -143,6 +165,7 @@ impl ASGIHTTPProtocol { .map_err(|e| match e {}) .boxed(), ); + self.flow_tx_waiter.notify_one(); empty_future_into_py(py) } (true, true, false) => { @@ -164,10 +187,13 @@ impl ASGIHTTPProtocol { _ => error_flow!(), }, (true, false, true) => match self.body_tx.take() { - Some(tx) => match body.is_empty() { - false => self.send_body(py, tx, body), - true => empty_future_into_py(py), - }, + Some(tx) => { + self.flow_tx_waiter.notify_one(); + match body.is_empty() { + false => self.send_body(py, tx, body), + true => empty_future_into_py(py), + } + } _ => error_flow!(), }, _ => error_flow!(),