From 9f33d81017030a6b7db770442efab29d8761b3e9 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Tue, 7 Apr 2026 16:46:40 -0700 Subject: [PATCH 1/2] parse sql responses more permissively --- src/http.rs | 44 ++++++++++++++++++++++++++++++++++++++++---- src/sql.rs | 17 ++++++++++++----- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/src/http.rs b/src/http.rs index e077add..5a7605a 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result}; -use reqwest::header::HeaderValue; +use reqwest::header::{HeaderValue, CONTENT_TYPE}; use reqwest::Client; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -80,7 +80,7 @@ impl ApiClient { return Err(HttpError { status, body }.into()); } - response.json().await.context("failed to parse response") + parse_json_response(response).await } pub async fn post(&self, path: &str, body: &B) -> Result { @@ -100,7 +100,7 @@ impl ApiClient { return Err(HttpError { status, body }.into()); } - response.json().await.context("failed to parse response") + parse_json_response(response).await } pub async fn post_with_headers( @@ -146,7 +146,7 @@ impl ApiClient { return Err(HttpError { status, body }.into()); } - response.json().await.context("failed to parse response") + parse_json_response(response).await } pub async fn post_with_headers_raw( @@ -204,6 +204,42 @@ impl ApiClient { } } +async fn parse_json_response(response: reqwest::Response) -> Result { + let content_type = response + .headers() + .get(CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(str::to_owned); + let body = response + .text() + .await + .context("failed to read response body")?; + serde_json::from_str(&body).with_context(|| { + let mut message = String::from("failed to parse response"); + if let Some(content_type) = content_type.as_deref() { + message.push_str(&format!(" (content-type: {content_type})")); + } + let preview = preview_response_body(&body, 512); + if !preview.is_empty() { + message.push_str(&format!("; body: {preview}")); + } + message + }) +} + +fn preview_response_body(body: &str, max_chars: usize) -> String { + let trimmed = body.trim(); + if trimmed.is_empty() { + return String::new(); + } + + let mut preview = trimmed.chars().take(max_chars).collect::(); + if trimmed.chars().count() > max_chars { + preview.push_str("..."); + } + preview +} + const UPLOAD_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120); pub async fn put_signed_url( diff --git a/src/sql.rs b/src/sql.rs index a862ab5..682e06b 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -58,15 +58,22 @@ struct SqlResponse { #[derive(Debug, Serialize, Deserialize)] struct FreshnessState { - pub last_considered_xact_id: String, - pub last_processed_xact_id: String, + #[serde(default)] + pub last_considered_xact_id: Option, + #[serde(default)] + pub last_processed_xact_id: Option, } #[derive(Debug, Serialize, Deserialize)] struct RealtimeState { - pub actual_xact_id: String, - pub minimum_xact_id: String, - pub read_bytes: u64, + #[serde(default)] + pub actual_xact_id: Option, + #[serde(default)] + pub minimum_xact_id: Option, + #[serde(default)] + pub read_bytes: Option, + #[serde(default)] + pub timeout_ms: Option, #[serde(rename = "type")] pub state_type: String, } From 0896c310dd1327ae31dc1a5a3e5310f94f4d510c Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Tue, 7 Apr 2026 16:58:33 -0700 Subject: [PATCH 2/2] bump --- src/http.rs | 44 ++++---------------------------------------- 1 file changed, 4 insertions(+), 40 deletions(-) diff --git a/src/http.rs b/src/http.rs index 5a7605a..e077add 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result}; -use reqwest::header::{HeaderValue, CONTENT_TYPE}; +use reqwest::header::HeaderValue; use reqwest::Client; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -80,7 +80,7 @@ impl ApiClient { return Err(HttpError { status, body }.into()); } - parse_json_response(response).await + response.json().await.context("failed to parse response") } pub async fn post(&self, path: &str, body: &B) -> Result { @@ -100,7 +100,7 @@ impl ApiClient { return Err(HttpError { status, body }.into()); } - parse_json_response(response).await + response.json().await.context("failed to parse response") } pub async fn post_with_headers( @@ -146,7 +146,7 @@ impl ApiClient { return Err(HttpError { status, body }.into()); } - parse_json_response(response).await + response.json().await.context("failed to parse response") } pub async fn post_with_headers_raw( @@ -204,42 +204,6 @@ impl ApiClient { } } -async fn parse_json_response(response: reqwest::Response) -> Result { - let content_type = response - .headers() - .get(CONTENT_TYPE) - .and_then(|value| value.to_str().ok()) - .map(str::to_owned); - let body = response - .text() - .await - .context("failed to read response body")?; - serde_json::from_str(&body).with_context(|| { - let mut message = String::from("failed to parse response"); - if let Some(content_type) = content_type.as_deref() { - message.push_str(&format!(" (content-type: {content_type})")); - } - let preview = preview_response_body(&body, 512); - if !preview.is_empty() { - message.push_str(&format!("; body: {preview}")); - } - message - }) -} - -fn preview_response_body(body: &str, max_chars: usize) -> String { - let trimmed = body.trim(); - if trimmed.is_empty() { - return String::new(); - } - - let mut preview = trimmed.chars().take(max_chars).collect::(); - if trimmed.chars().count() > max_chars { - preview.push_str("..."); - } - preview -} - const UPLOAD_HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120); pub async fn put_signed_url(