Skip to content

Commit b8b4487

Browse files
committed
fix(response): ensure response remains cached when valid
1 parent 7b6b378 commit b8b4487

2 files changed

Lines changed: 46 additions & 63 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,5 @@ keylog.txt
8383
*.jpg
8484
*.csv
8585
*.png
86-
docs/site
86+
docs/site
87+
*.pdb

src/client/resp/http.rs

Lines changed: 44 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ use std::{fmt::Display, sync::Arc};
22

33
use arc_swap::ArcSwapOption;
44
use bytes::Bytes;
5-
use futures_util::TryFutureExt;
5+
use futures_util::{
6+
TryFutureExt,
7+
future::{self, BoxFuture},
8+
};
69
use http::response::{Parts, Response as HttpResponse};
7-
use http_body_util::BodyExt;
10+
use http_body_util::{BodyExt, Collected};
811
use pyo3::{coroutine::CancelHandle, prelude::*, pybacked::PyBackedStr};
912
use wreq::{self, Uri};
1013

@@ -25,7 +28,6 @@ use crate::{
2528
};
2629

2730
/// A response from a request.
28-
#[derive(Clone)]
2931
#[pyclass(subclass, frozen, str, skip_from_py_object)]
3032
pub struct Response {
3133
uri: Uri,
@@ -51,69 +53,58 @@ impl Response {
5153
/// Create a new [`Response`] instance.
5254
pub fn new(response: wreq::Response) -> Self {
5355
let uri = response.uri().clone();
54-
let response = HttpResponse::from(response);
56+
let response = HttpResponse::from(response)
57+
.map(Body::Streamable)
58+
.map(ArcSwapOption::from_pointee)
59+
.map(Arc::new);
5560
let (parts, body) = response.into_parts();
56-
Response {
57-
uri,
58-
parts,
59-
body: Arc::new(ArcSwapOption::from_pointee(Body::Streamable(body))),
60-
}
61+
Response { uri, parts, body }
6162
}
6263

63-
/// Builds a `wreq::Response` from the current response metadata and the given body.
64-
///
65-
/// This creates a new HTTP response with the same version, status, headers, and extensions
66-
/// as the current response, but with the provided body.
67-
fn build_response(&self, body: wreq::Body) -> wreq::Response {
68-
let mut response = HttpResponse::new(body);
69-
*response.version_mut() = self.parts.version;
70-
*response.status_mut() = self.parts.status;
71-
*response.headers_mut() = self.parts.headers.clone();
72-
*response.extensions_mut() = self.parts.extensions.clone();
64+
/// Builds a [`wreq::Response`] from the current response metadata and the given body.
65+
fn build_response<T: Into<wreq::Body>>(&self, body: T) -> wreq::Response {
66+
let response = HttpResponse::from_parts(self.parts.clone(), body);
7367
wreq::Response::from(response)
7468
}
7569

76-
/// Creates an empty response with the same metadata but no body content.
77-
///
78-
/// Useful for operations that only need response headers/metadata without consuming the body.
70+
/// Creates an empty [`wreq::Response`] with the same metadata but no body content.
7971
fn empty_response(&self) -> wreq::Response {
80-
self.build_response(wreq::Body::from(Bytes::new()))
72+
self.build_response(Bytes::new())
8173
}
8274

83-
/// Consumes the response body and caches it in memory for reuse.
84-
///
85-
/// If the body is streamable, it will be fully read into memory and cached.
86-
/// If the body is already cached, it will be cloned and reused.
87-
/// Returns an error if the body has already been consumed or if reading fails.
88-
async fn cache_response(self) -> Result<wreq::Response, Error> {
89-
if let Some(arc) = self.body.swap(None) {
75+
/// Consumes the response [`Body`] and caches it in memory for reuse.
76+
fn cache_response(&self) -> BoxFuture<'static, Result<wreq::Response, Error>> {
77+
let body = self.body.clone();
78+
if let Some(arc) = body.swap(None) {
9079
match Arc::try_unwrap(arc) {
91-
Ok(Body::Streamable(body)) => {
92-
let bytes = BodyExt::collect(body)
93-
.await
94-
.map(|buf| buf.to_bytes())
95-
.map_err(Error::Library)?;
96-
97-
self.body
98-
.store(Some(Arc::new(Body::Reusable(bytes.clone()))));
99-
Ok(self.build_response(wreq::Body::from(bytes)))
80+
Ok(Body::Streamable(stream)) => {
81+
let parts = self.parts.clone();
82+
return Box::pin(async move {
83+
let bytes = stream
84+
.collect()
85+
.await
86+
.map(Collected::to_bytes)
87+
.map_err(Error::Library)?;
88+
89+
body.store(Some(Arc::new(Body::Reusable(bytes.clone()))));
90+
let response = HttpResponse::from_parts(parts, bytes);
91+
Ok(wreq::Response::from(response))
92+
});
10093
}
10194
Ok(Body::Reusable(bytes)) => {
102-
self.body
103-
.store(Some(Arc::new(Body::Reusable(bytes.clone()))));
104-
Ok(self.build_response(wreq::Body::from(bytes)))
95+
let parts = self.parts.clone();
96+
body.store(Some(Arc::new(Body::Reusable(bytes.clone()))));
97+
let response = HttpResponse::from_parts(parts, bytes);
98+
return Box::pin(future::ok(wreq::Response::from(response)));
10599
}
106-
_ => Err(Error::Memory),
100+
Err(arc) => body.store(Some(arc)),
107101
}
108-
} else {
109-
Err(Error::Memory)
110102
}
103+
104+
Box::pin(future::err(Error::Memory))
111105
}
112106

113-
/// Consumes the response body for streaming without caching.
114-
///
115-
/// This method transfers ownership of the streamable body for one-time use.
116-
/// Returns an error if the body has already been consumed or is not streamable.
107+
/// Consumes the response [`Body`] for streaming without caching.
117108
fn stream_response(&self) -> Result<wreq::Response, Error> {
118109
if let Some(arc) = self.body.swap(None) {
119110
if let Ok(Body::Streamable(body)) = Arc::try_unwrap(arc) {
@@ -123,7 +114,7 @@ impl Response {
123114
Err(Error::Memory)
124115
}
125116

126-
/// Forcefully destroys the response body, preventing any further reads.
117+
/// Forcefully destroys the response [`Body`], preventing any further reads.
127118
fn destroy(&self) {
128119
if let Some(body) = self.body.swap(None) {
129120
if let Ok(body) = Arc::try_unwrap(body) {
@@ -193,8 +184,7 @@ impl Response {
193184
#[getter]
194185
pub fn history(&self, py: Python) -> Vec<History> {
195186
py.detach(|| {
196-
self.clone()
197-
.empty_response()
187+
self.empty_response()
198188
.extensions()
199189
.get::<wreq::redirect::History>()
200190
.map_or_else(Vec::new, |history| {
@@ -207,8 +197,7 @@ impl Response {
207197
#[getter]
208198
pub fn tls_info(&self, py: Python) -> Option<TlsInfo> {
209199
py.detach(|| {
210-
self.clone()
211-
.empty_response()
200+
self.empty_response()
212201
.extensions()
213202
.get::<wreq::tls::TlsInfo>()
214203
.cloned()
@@ -218,8 +207,7 @@ impl Response {
218207

219208
/// Turn a response into an error if the server returned an error.
220209
pub fn raise_for_status(&self) -> PyResult<()> {
221-
self.clone()
222-
.empty_response()
210+
self.empty_response()
223211
.error_for_status()
224212
.map(|_| ())
225213
.map_err(Error::Library)
@@ -241,7 +229,6 @@ impl Response {
241229
encoding: Option<PyBackedStr>,
242230
) -> PyResult<String> {
243231
let fut = self
244-
.clone()
245232
.cache_response()
246233
.and_then(|resp| ResponseExt::text(resp, encoding))
247234
.map_err(Into::into);
@@ -251,7 +238,6 @@ impl Response {
251238
/// Get the JSON content of the response.
252239
pub async fn json(&self, #[pyo3(cancel_handle)] cancel: CancelHandle) -> PyResult<Json> {
253240
let fut = self
254-
.clone()
255241
.cache_response()
256242
.and_then(ResponseExt::json::<Json>)
257243
.map_err(Into::into);
@@ -261,7 +247,6 @@ impl Response {
261247
/// Get the bytes content of the response.
262248
pub async fn bytes(&self, #[pyo3(cancel_handle)] cancel: CancelHandle) -> PyResult<PyBuffer> {
263249
let fut = self
264-
.clone()
265250
.cache_response()
266251
.and_then(ResponseExt::bytes)
267252
.map_ok(PyBuffer::from)
@@ -404,7 +389,6 @@ impl BlockingResponse {
404389
py.detach(|| {
405390
let fut = self
406391
.0
407-
.clone()
408392
.cache_response()
409393
.and_then(|resp| ResponseExt::text(resp, encoding))
410394
.map_err(Into::into);
@@ -417,7 +401,6 @@ impl BlockingResponse {
417401
py.detach(|| {
418402
let fut = self
419403
.0
420-
.clone()
421404
.cache_response()
422405
.and_then(ResponseExt::json::<Json>)
423406
.map_err(Into::into);
@@ -430,7 +413,6 @@ impl BlockingResponse {
430413
py.detach(|| {
431414
let fut = self
432415
.0
433-
.clone()
434416
.cache_response()
435417
.and_then(ResponseExt::bytes)
436418
.map_ok(PyBuffer::from)

0 commit comments

Comments
 (0)