diff --git a/.gitignore b/.gitignore index eb779b34..a911f6c5 100644 --- a/.gitignore +++ b/.gitignore @@ -83,4 +83,5 @@ keylog.txt *.jpg *.csv *.png -docs/site \ No newline at end of file +docs/site +*.pdb \ No newline at end of file diff --git a/src/client/resp/http.rs b/src/client/resp/http.rs index d94b797f..ec30894d 100644 --- a/src/client/resp/http.rs +++ b/src/client/resp/http.rs @@ -2,9 +2,12 @@ use std::{fmt::Display, sync::Arc}; use arc_swap::ArcSwapOption; use bytes::Bytes; -use futures_util::TryFutureExt; +use futures_util::{ + TryFutureExt, + future::{self, BoxFuture}, +}; use http::response::{Parts, Response as HttpResponse}; -use http_body_util::BodyExt; +use http_body_util::{BodyExt, Collected}; use pyo3::{coroutine::CancelHandle, prelude::*, pybacked::PyBackedStr}; use wreq::{self, Uri}; @@ -25,7 +28,6 @@ use crate::{ }; /// A response from a request. -#[derive(Clone)] #[pyclass(subclass, frozen, str, skip_from_py_object)] pub struct Response { uri: Uri, @@ -51,69 +53,57 @@ impl Response { /// Create a new [`Response`] instance. pub fn new(response: wreq::Response) -> Self { let uri = response.uri().clone(); - let response = HttpResponse::from(response); + let response = HttpResponse::from(response) + .map(Body::Streamable) + .map(ArcSwapOption::from_pointee) + .map(Arc::new); let (parts, body) = response.into_parts(); - Response { - uri, - parts, - body: Arc::new(ArcSwapOption::from_pointee(Body::Streamable(body))), - } + Response { uri, parts, body } } - /// Builds a `wreq::Response` from the current response metadata and the given body. - /// - /// This creates a new HTTP response with the same version, status, headers, and extensions - /// as the current response, but with the provided body. - fn build_response(&self, body: wreq::Body) -> wreq::Response { - let mut response = HttpResponse::new(body); - *response.version_mut() = self.parts.version; - *response.status_mut() = self.parts.status; - *response.headers_mut() = self.parts.headers.clone(); - *response.extensions_mut() = self.parts.extensions.clone(); + /// Builds a [`wreq::Response`] from the current response metadata and the given body. + fn build_response>(&self, body: T) -> wreq::Response { + let response = HttpResponse::from_parts(self.parts.clone(), body); wreq::Response::from(response) } - /// Creates an empty response with the same metadata but no body content. - /// - /// Useful for operations that only need response headers/metadata without consuming the body. + /// Creates an empty [`wreq::Response`] with the same metadata but no body content. fn empty_response(&self) -> wreq::Response { - self.build_response(wreq::Body::from(Bytes::new())) + self.build_response(Bytes::new()) } - /// Consumes the response body and caches it in memory for reuse. - /// - /// If the body is streamable, it will be fully read into memory and cached. - /// If the body is already cached, it will be cloned and reused. - /// Returns an error if the body has already been consumed or if reading fails. - async fn cache_response(self) -> Result { + /// Consumes the response [`Body`] and caches it in memory for reuse. + fn cache_response(&self) -> BoxFuture<'static, Result> { if let Some(arc) = self.body.swap(None) { - match Arc::try_unwrap(arc) { - Ok(Body::Streamable(body)) => { - let bytes = BodyExt::collect(body) - .await - .map(|buf| buf.to_bytes()) - .map_err(Error::Library)?; - - self.body - .store(Some(Arc::new(Body::Reusable(bytes.clone())))); - Ok(self.build_response(wreq::Body::from(bytes))) + let parts = self.parts.clone(); + let body = self.body.clone(); + match Arc::into_inner(arc) { + Some(Body::Streamable(stream)) => { + return Box::pin(async move { + let bytes = stream + .collect() + .await + .map(Collected::to_bytes) + .map_err(Error::Library)?; + + body.store(Some(Arc::new(Body::Reusable(bytes.clone())))); + let response = HttpResponse::from_parts(parts, bytes); + Ok(wreq::Response::from(response)) + }); } - Ok(Body::Reusable(bytes)) => { - self.body - .store(Some(Arc::new(Body::Reusable(bytes.clone())))); - Ok(self.build_response(wreq::Body::from(bytes))) + Some(Body::Reusable(bytes)) => { + body.store(Some(Arc::new(Body::Reusable(bytes.clone())))); + let response = HttpResponse::from_parts(parts, bytes); + return Box::pin(future::ok(wreq::Response::from(response))); } - _ => Err(Error::Memory), + None => unreachable!("Arc should never be empty here"), } - } else { - Err(Error::Memory) } + + Box::pin(future::err(Error::Memory)) } - /// Consumes the response body for streaming without caching. - /// - /// This method transfers ownership of the streamable body for one-time use. - /// Returns an error if the body has already been consumed or is not streamable. + /// Consumes the response [`Body`] for streaming without caching. fn stream_response(&self) -> Result { if let Some(arc) = self.body.swap(None) { if let Ok(Body::Streamable(body)) = Arc::try_unwrap(arc) { @@ -123,7 +113,7 @@ impl Response { Err(Error::Memory) } - /// Forcefully destroys the response body, preventing any further reads. + /// Forcefully destroys the response [`Body`], preventing any further reads. fn destroy(&self) { if let Some(body) = self.body.swap(None) { if let Ok(body) = Arc::try_unwrap(body) { @@ -193,8 +183,7 @@ impl Response { #[getter] pub fn history(&self, py: Python) -> Vec { py.detach(|| { - self.clone() - .empty_response() + self.empty_response() .extensions() .get::() .map_or_else(Vec::new, |history| { @@ -207,8 +196,7 @@ impl Response { #[getter] pub fn tls_info(&self, py: Python) -> Option { py.detach(|| { - self.clone() - .empty_response() + self.empty_response() .extensions() .get::() .cloned() @@ -218,8 +206,7 @@ impl Response { /// Turn a response into an error if the server returned an error. pub fn raise_for_status(&self) -> PyResult<()> { - self.clone() - .empty_response() + self.empty_response() .error_for_status() .map(|_| ()) .map_err(Error::Library) @@ -241,7 +228,6 @@ impl Response { encoding: Option, ) -> PyResult { let fut = self - .clone() .cache_response() .and_then(|resp| ResponseExt::text(resp, encoding)) .map_err(Into::into); @@ -251,7 +237,6 @@ impl Response { /// Get the JSON content of the response. pub async fn json(&self, #[pyo3(cancel_handle)] cancel: CancelHandle) -> PyResult { let fut = self - .clone() .cache_response() .and_then(ResponseExt::json::) .map_err(Into::into); @@ -261,7 +246,6 @@ impl Response { /// Get the bytes content of the response. pub async fn bytes(&self, #[pyo3(cancel_handle)] cancel: CancelHandle) -> PyResult { let fut = self - .clone() .cache_response() .and_then(ResponseExt::bytes) .map_ok(PyBuffer::from) @@ -404,7 +388,6 @@ impl BlockingResponse { py.detach(|| { let fut = self .0 - .clone() .cache_response() .and_then(|resp| ResponseExt::text(resp, encoding)) .map_err(Into::into); @@ -417,7 +400,6 @@ impl BlockingResponse { py.detach(|| { let fut = self .0 - .clone() .cache_response() .and_then(ResponseExt::json::) .map_err(Into::into); @@ -430,7 +412,6 @@ impl BlockingResponse { py.detach(|| { let fut = self .0 - .clone() .cache_response() .and_then(ResponseExt::bytes) .map_ok(PyBuffer::from)