diff --git a/src/client/resp/http.rs b/src/client/resp/http.rs index 9ed82819..d94b797f 100644 --- a/src/client/resp/http.rs +++ b/src/client/resp/http.rs @@ -64,19 +64,19 @@ impl Response { /// /// This creates a new HTTP response with the same version, status, headers, and extensions /// as the current response, but with the provided body. - fn build_response(self, body: wreq::Body) -> wreq::Response { + fn build_response(&self, body: wreq::Body) -> wreq::Response { let mut response = HttpResponse::new(body); *response.version_mut() = self.parts.version; *response.status_mut() = self.parts.status; - *response.headers_mut() = self.parts.headers; - *response.extensions_mut() = self.parts.extensions; + *response.headers_mut() = self.parts.headers.clone(); + *response.extensions_mut() = self.parts.extensions.clone(); wreq::Response::from(response) } /// Creates an empty response with the same metadata but no body content. /// /// Useful for operations that only need response headers/metadata without consuming the body. - fn empty_response(self) -> wreq::Response { + fn empty_response(&self) -> wreq::Response { self.build_response(wreq::Body::from(Bytes::new())) } @@ -114,7 +114,7 @@ impl Response { /// /// This method transfers ownership of the streamable body for one-time use. /// Returns an error if the body has already been consumed or is not streamable. - fn stream_response(self) -> Result { + fn stream_response(&self) -> Result { if let Some(arc) = self.body.swap(None) { if let Ok(Body::Streamable(body)) = Arc::try_unwrap(arc) { return Ok(self.build_response(body)); @@ -122,6 +122,21 @@ impl Response { } Err(Error::Memory) } + + /// Forcefully destroys the response body, preventing any further reads. + fn destroy(&self) { + if let Some(body) = self.body.swap(None) { + if let Ok(body) = Arc::try_unwrap(body) { + ::std::mem::drop(body); + } + } + } +} + +impl Drop for Response { + fn drop(&mut self) { + self.destroy(); + } } #[pymethods] @@ -159,19 +174,19 @@ impl Response { /// Get the content length of the response. #[getter] pub fn content_length(&self, py: Python) -> Option { - py.detach(|| self.clone().empty_response().content_length()) + py.detach(|| self.empty_response().content_length()) } /// Get the remote address of the response. #[getter] pub fn remote_addr(&self, py: Python) -> Option { - py.detach(|| self.clone().empty_response().remote_addr().map(SocketAddr)) + py.detach(|| self.empty_response().remote_addr().map(SocketAddr)) } /// Get the local address of the response. #[getter] pub fn local_addr(&self, py: Python) -> Option { - py.detach(|| self.clone().empty_response().local_addr().map(SocketAddr)) + py.detach(|| self.empty_response().local_addr().map(SocketAddr)) } /// Get the redirect history of the Response. @@ -213,8 +228,7 @@ impl Response { /// Get the response into a `Stream` of `Bytes` from the body. pub fn stream(&self) -> PyResult { - self.clone() - .stream_response() + self.stream_response() .map(Streamer::new) .map_err(Into::into) } @@ -259,20 +273,21 @@ impl Response { /// /// **Current behavior:** /// - When connection pooling is **disabled**: This method closes the network connection. - /// - When connection pooling is **enabled**: This method closes the response, prevents further body reads, - /// and returns the connection to the pool for reuse. + /// - When connection pooling is **enabled**: This method closes the response, prevents further + /// body reads, and returns the connection to the pool for reuse. /// /// **Future changes:** - /// In future versions, this method will be changed to always close the network connection regardless of - /// whether connection pooling is enabled or not. + /// In future versions, this method will be changed to always close the network connection + /// regardless of whether connection pooling is enabled or not. /// /// **Recommendation:** - /// It is **not recommended** to manually call this method at present. Instead, use context managers - /// (async with statement) to properly manage response lifecycle. Wait for the improved implementation - /// in future versions. - pub async fn close(&self) -> PyResult<()> { - self.body.swap(None); - Ok(()) + /// It is **not recommended** to manually call this method at present. Instead, use context + /// managers (async with statement) to properly manage response lifecycle. Wait for the + /// improved implementation in future versions. + pub async fn close(&self) { + Python::attach(|py| { + py.detach(|| self.destroy()); + }); } } @@ -284,12 +299,7 @@ impl Response { } #[inline] - async fn __aexit__( - &self, - _exc_type: Py, - _exc_val: Py, - _traceback: Py, - ) -> PyResult<()> { + async fn __aexit__(&self, _exc_type: Py, _exc_val: Py, _traceback: Py) { self.close().await } } @@ -308,6 +318,12 @@ impl Display for Response { // ===== impl BlockingResponse ===== +impl Drop for BlockingResponse { + fn drop(&mut self) { + self.0.destroy(); + } +} + #[pymethods] impl BlockingResponse { /// Get the URL of the response. @@ -427,20 +443,20 @@ impl BlockingResponse { /// /// **Current behavior:** /// - When connection pooling is **disabled**: This method closes the network connection. - /// - When connection pooling is **enabled**: This method closes the response, prevents further body reads, - /// and returns the connection to the pool for reuse. + /// - When connection pooling is **enabled**: This method closes the response, prevents further + /// body reads, and returns the connection to the pool for reuse. /// /// **Future changes:** - /// In future versions, this method will be changed to always close the network connection regardless of - /// whether connection pooling is enabled or not. + /// In future versions, this method will be changed to always close the network connection + /// regardless of whether connection pooling is enabled or not. /// /// **Recommendation:** - /// It is **not recommended** to manually call this method at present. Instead, use context managers - /// (with statement) to properly manage response lifecycle. Wait for the improved implementation - /// in future versions. + /// It is **not recommended** to manually call this method at present. Instead, use context + /// managers (with statement) to properly manage response lifecycle. Wait for the improved + /// implementation in future versions. #[inline] pub fn close(&self, py: Python) { - py.detach(|| self.0.body.swap(None)); + py.detach(|| self.0.destroy()); } } diff --git a/src/lib.rs b/src/lib.rs index b1b39ede..f31046a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,15 +66,17 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; mod r#async { - use crate::client::{ - Client, - req::Request, - req::WebSocketRequest, - resp::{Response, WebSocket}, - }; - use crate::http::Method; use pyo3::{coroutine::CancelHandle, prelude::*, pybacked::PyBackedStr}; + use crate::{ + client::{ + Client, + req::{Request, WebSocketRequest}, + resp::{Response, WebSocket}, + }, + http::Method, + }; + /// Make a GET request with the given parameters. #[inline] #[pyfunction] @@ -198,15 +200,17 @@ mod r#async { } mod blocking { - use crate::client::{ - BlockingClient, - req::Request, - req::WebSocketRequest, - resp::{BlockingResponse, BlockingWebSocket}, - }; - use crate::http::Method; use pyo3::{prelude::*, pybacked::PyBackedStr}; + use crate::{ + client::{ + BlockingClient, + req::{Request, WebSocketRequest}, + resp::{BlockingResponse, BlockingWebSocket}, + }, + http::Method, + }; + /// Make a GET request with the given parameters (blocking). #[inline] #[pyfunction]