diff --git a/lib/src/datum_cloud.rs b/lib/src/datum_cloud.rs index 61a8a2f..fdebb86 100644 --- a/lib/src/datum_cloud.rs +++ b/lib/src/datum_cloud.rs @@ -14,7 +14,7 @@ use crate::http_user_agent::datum_http_user_agent; use crate::{ProjectControlPlaneClient, Repo, SelectedContext}; pub use self::{ - auth::{AuthClient, AuthState, LoginState, MaybeAuth, UserProfile}, + auth::{AuthClient, AuthState, LoginState, MaybeAuth, NotLoggedIn, Unauthorized, UserProfile}, env::ApiEnv, }; @@ -327,23 +327,15 @@ impl DatumCloudClient { async fn post_json(&self, url: &str, body: &serde_json::Value) -> Result<()> { tracing::debug!("POST {url}"); - - let auth_state = self.auth.load_refreshed().await?; - let auth = auth_state.get()?; - let res = self - .http - .post(url) - .header( - "Authorization", - format!("Bearer {}", auth.tokens.access_token.secret()), - ) - .header("Content-Type", "application/json") - .json(body) - .send() - .await - .inspect_err(|e| warn!(%url, "Failed to POST: {e:#}")) - .with_std_context(|_| format!("Failed to POST {url}"))?; + .request_with_auth_retry(|token| { + self.http + .post(url) + .header("Authorization", format!("Bearer {token}")) + .header("Content-Type", "application/json") + .json(body) + }) + .await?; let status = res.status(); if !status.is_success() { let text = match res.text().await { @@ -358,22 +350,13 @@ impl DatumCloudClient { async fn fetch_direct(&self, url: &str) -> Result { tracing::debug!("GET {url}"); - - // Refresh access token if they are close to expiring. - let auth_state = self.auth.load_refreshed().await?; - let auth = auth_state.get()?; - let res = self - .http - .get(url) - .header( - "Authorization", - format!("Bearer {}", auth.tokens.access_token.secret()), - ) - .send() - .await - .inspect_err(|e| warn!(%url, "Failed to fetch: {e:#}")) - .with_std_context(|_| format!("Failed to fetch {url}"))?; + .request_with_auth_retry(|token| { + self.http + .get(url) + .header("Authorization", format!("Bearer {token}")) + }) + .await?; let status = res.status(); if !status.is_success() { let text = match res.text().await { @@ -391,6 +374,57 @@ impl DatumCloudClient { Ok(json) } + /// Send an authenticated request and, on 401/403, force a token refresh and retry once. + /// If the second attempt still returns 401/403, clear the local auth state and return + /// [`Unauthorized`] so the UI redirects to login. + /// + /// The closure builds the request (sans `.send()`) given the current bearer token, so we + /// can rebuild it after a refresh without the caller having to reconstruct headers/body. + async fn request_with_auth_retry(&self, build: F) -> Result + where + F: Fn(&str) -> reqwest::RequestBuilder, + { + let auth_state = self.auth.load_refreshed().await?; + let auth = auth_state.get()?; + let res = build(auth.tokens.access_token.secret()) + .send() + .await + .inspect_err(|e| warn!("Request failed: {e:#}")) + .std_context("HTTP request failed")?; + if !is_auth_failure(res.status()) { + return Ok(res); + } + + warn!( + status = %res.status(), + "Server rejected token; attempting forced refresh" + ); + if let Err(err) = self.auth.force_refresh().await { + warn!("Forced auth refresh failed: {err:#}"); + return Err(Unauthorized.into()); + } + let auth_state = self.auth.load(); + let Ok(auth) = auth_state.get() else { + return Err(Unauthorized.into()); + }; + let retry = build(auth.tokens.access_token.secret()) + .send() + .await + .inspect_err(|e| warn!("Retried request failed: {e:#}")) + .std_context("HTTP request retry failed")?; + if is_auth_failure(retry.status()) { + warn!( + status = %retry.status(), + "Server still rejected token after refresh; logging out" + ); + if let Err(err) = self.auth.logout().await { + warn!("Failed to clear auth state after persistent 401/403: {err:#}"); + } + return Err(Unauthorized.into()); + } + Ok(retry) + } + fn project_control_plane_client_with_token( &self, project_id: &str, @@ -704,3 +738,212 @@ fn invitation_name(org_id: &str) -> String { .to_lowercase(); format!("{org_id}-{suffix}") } + +/// True if the response status indicates the bearer token is no longer accepted. +fn is_auth_failure(status: reqwest::StatusCode) -> bool { + status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN +} + +#[cfg(test)] +mod auth_failure_tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex}; + + use http_body_util::Full; + use hyper::body::Bytes; + use hyper::service::service_fn; + use hyper::{Request, Response}; + use hyper_util::rt::TokioIo; + use tokio::net::TcpListener; + + use super::*; + + #[test] + fn classifies_401_and_403_as_auth_failures() { + assert!(is_auth_failure(reqwest::StatusCode::UNAUTHORIZED)); + assert!(is_auth_failure(reqwest::StatusCode::FORBIDDEN)); + } + + #[test] + fn does_not_classify_other_statuses_as_auth_failures() { + assert!(!is_auth_failure(reqwest::StatusCode::OK)); + assert!(!is_auth_failure(reqwest::StatusCode::NOT_FOUND)); + assert!(!is_auth_failure(reqwest::StatusCode::INTERNAL_SERVER_ERROR)); + assert!(!is_auth_failure(reqwest::StatusCode::BAD_REQUEST)); + // 407 Proxy Authentication Required is distinct from end-user auth failures; + // we intentionally do not treat it as a bearer-token rejection. + assert!(!is_auth_failure( + reqwest::StatusCode::PROXY_AUTHENTICATION_REQUIRED + )); + } + + #[test] + fn unauthorized_error_displays_user_friendly_message() { + let err: n0_error::AnyError = Unauthorized.into(); + let msg = format!("{err}"); + assert!(!msg.is_empty(), "Unauthorized should have a Display impl"); + // The roundtrip downcast must work so callers can switch on auth failures. + assert!(err.downcast_ref::().is_some()); + } + + /// Models the retry behavior we expect from [`DatumCloudClient::request_with_auth_retry`]: + /// hit a 401 once, ask for a refreshed token, retry the same request with the new + /// token, and observe a 200. We exercise the pattern at the HTTP layer against a + /// local hyper server so the contract is pinned independent of the wider client. + async fn run_with_auth_retry( + client: &reqwest::Client, + url: &str, + tokens: Arc, + outcome_log: Arc>>, + ) -> reqwest::Response { + let send = |bearer: &str| { + client + .get(url) + .header("Authorization", format!("Bearer {bearer}")) + }; + + let res = send(&tokens.current()).send().await.expect("first request"); + if !is_auth_failure(res.status()) { + outcome_log.lock().unwrap().push("first-ok"); + return res; + } + outcome_log.lock().unwrap().push("first-401"); + + tokens.rotate(); + outcome_log.lock().unwrap().push("refreshed"); + + let retry = send(&tokens.current()).send().await.expect("retry request"); + if is_auth_failure(retry.status()) { + outcome_log.lock().unwrap().push("retry-401-logout"); + } else { + outcome_log.lock().unwrap().push("retry-ok"); + } + retry + } + + struct TokenStash { + tokens: Mutex>, + } + impl TokenStash { + fn new(initial: &str) -> Arc { + Arc::new(Self { + tokens: Mutex::new(vec![initial.into()]), + }) + } + fn current(&self) -> String { + self.tokens.lock().unwrap().last().cloned().unwrap() + } + fn rotate(&self) { + let mut tokens = self.tokens.lock().unwrap(); + let next = format!("fresh-{}", tokens.len()); + tokens.push(next); + } + } + + async fn spawn_server(handler: H) -> (String, tokio::task::JoinHandle<()>) + where + H: Fn(Request) -> Response> + + Send + + Sync + + Clone + + 'static, + { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let url = format!("http://{addr}"); + let handle = tokio::spawn(async move { + loop { + let (stream, _) = match listener.accept().await { + Ok(v) => v, + Err(_) => return, + }; + let handler = handler.clone(); + tokio::spawn(async move { + let svc = service_fn(move |req| { + let handler = handler.clone(); + async move { Ok::<_, std::convert::Infallible>(handler(req)) } + }); + let _ = hyper::server::conn::http1::Builder::new() + .serve_connection(TokioIo::new(stream), svc) + .await; + }); + } + }); + (url, handle) + } + + fn auth_header(req: &Request) -> Option { + req.headers() + .get("authorization") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + } + + #[tokio::test] + async fn retry_succeeds_after_401_then_200() { + let calls = Arc::new(AtomicUsize::new(0)); + let calls_handler = calls.clone(); + let (url, handle) = spawn_server(move |req| { + let n = calls_handler.fetch_add(1, Ordering::SeqCst); + let bearer = auth_header(&req).unwrap_or_default(); + if n == 0 { + assert_eq!(bearer, "Bearer t0", "first request uses initial token"); + Response::builder() + .status(401) + .body(Full::new(Bytes::from("unauthorized"))) + .unwrap() + } else { + assert_eq!( + bearer, "Bearer fresh-1", + "retry uses the refreshed token from the stash" + ); + Response::builder() + .status(200) + .body(Full::new(Bytes::from("ok"))) + .unwrap() + } + }) + .await; + + let client = reqwest::Client::new(); + let tokens = TokenStash::new("t0"); + let log = Arc::new(Mutex::new(Vec::new())); + let res = run_with_auth_retry(&client, &url, tokens, log.clone()).await; + handle.abort(); + + assert!(res.status().is_success()); + assert_eq!(calls.load(Ordering::SeqCst), 2); + assert_eq!( + &*log.lock().unwrap(), + &["first-401", "refreshed", "retry-ok"] + ); + } + + #[tokio::test] + async fn retry_still_401_triggers_logout_path() { + let calls = Arc::new(AtomicUsize::new(0)); + let calls_handler = calls.clone(); + let (url, handle) = spawn_server(move |_req| { + calls_handler.fetch_add(1, Ordering::SeqCst); + Response::builder() + .status(401) + .body(Full::new(Bytes::from("still nope"))) + .unwrap() + }) + .await; + + let client = reqwest::Client::new(); + let tokens = TokenStash::new("t0"); + let log = Arc::new(Mutex::new(Vec::new())); + let res = run_with_auth_retry(&client, &url, tokens, log.clone()).await; + handle.abort(); + + // After two 401s we surface the failure and the caller is expected to clear auth. + assert_eq!(res.status(), reqwest::StatusCode::UNAUTHORIZED); + assert_eq!(calls.load(Ordering::SeqCst), 2); + assert_eq!( + &*log.lock().unwrap(), + &["first-401", "refreshed", "retry-401-logout"] + ); + } +} diff --git a/lib/src/datum_cloud/auth.rs b/lib/src/datum_cloud/auth.rs index 66fbe7a..a811831 100644 --- a/lib/src/datum_cloud/auth.rs +++ b/lib/src/datum_cloud/auth.rs @@ -405,6 +405,10 @@ impl StatelessClient { .with_std_context(|_| format!("Failed to fetch user profile from {url}"))?; let status = res.status(); + if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN { + warn!(%url, "User profile fetch returned {status}"); + return Err(Unauthorized.into()); + } if !status.is_success() { let text = match res.text().await { Ok(text) => text, @@ -427,6 +431,13 @@ impl StatelessClient { #[error("Not logged in")] pub struct NotLoggedIn; +/// The server rejected the access token even after attempting a forced refresh. +/// The caller has already cleared the local auth state, so consumers can rely on +/// `login_state_watch` to drive a redirect to the login screen. +#[stack_error(derive)] +#[error("Authentication failed; signed out")] +pub struct Unauthorized; + #[derive(Default, Debug)] pub struct MaybeAuth(Option); @@ -714,8 +725,63 @@ impl AuthClient { Ok(()) } - /// Refresh the user profile from the API without refreshing tokens + /// Force a token refresh even if the access token is not near expiry. + /// + /// Used by API callers when the server rejects an apparently-valid token + /// (e.g. token revoked server-side, key rotation, perm change). On failure + /// the local auth state is cleared so the UI redirects to login. + pub async fn force_refresh(&self) -> Result<()> { + let auth = self.state.load(); + let auth = auth.get()?; + let client = self.ensure_fresh_client().await?; + let new_auth = match client.refresh(&auth.tokens).await { + Ok(auth) => auth, + Err(err) => { + warn!("Failed to force-refresh auth tokens, logging out: {err:#}"); + self.state.set(None).await?; + Err(err).context("Failed to force-refresh auth tokens, needs login")? + } + }; + self.state.set(Some(new_auth)).await?; + Ok(()) + } + + /// Refresh the user profile from the API without refreshing tokens. + /// + /// If the API rejects the access token (401/403), this attempts one forced refresh + /// and retries. If the retry also fails with an auth error, the local auth state + /// is cleared and [`Unauthorized`] is returned. The watch on `login_state` then + /// drives a redirect to the login screen. pub async fn refresh_profile(&self) -> Result<()> { + match self.try_fetch_profile_once().await { + Ok(()) => return Ok(()), + Err(err) if err.downcast_ref::().is_some() => { + warn!("refresh_profile: server rejected token; forcing refresh"); + } + Err(err) => return Err(err), + } + + // One forced refresh and retry. If anything in this branch fails we treat it + // as a hard logout signal so the user doesn't sit on a half-functional session. + if let Err(err) = self.force_refresh().await { + warn!("refresh_profile: forced refresh failed: {err:#}"); + // force_refresh already cleared local auth on failure. + return Err(Unauthorized.into()); + } + match self.try_fetch_profile_once().await { + Ok(()) => Ok(()), + Err(err) if err.downcast_ref::().is_some() => { + warn!("refresh_profile: auth failure persisted after refresh; logging out"); + if let Err(e) = self.logout().await { + warn!("refresh_profile: failed to clear auth state: {e:#}"); + } + Err(Unauthorized.into()) + } + Err(err) => Err(err), + } + } + + async fn try_fetch_profile_once(&self) -> Result<()> { let auth = self.state.load(); let auth = auth.get()?; let user_id = auth.profile.user_id.clone(); diff --git a/lib/src/heartbeat.rs b/lib/src/heartbeat.rs index 2a7ba1e..04c0331 100644 --- a/lib/src/heartbeat.rs +++ b/lib/src/heartbeat.rs @@ -8,7 +8,7 @@ use chrono::Utc; use k8s_openapi::apimachinery::pkg::apis::meta::v1::MicroTime; use kube::api::{ListParams, Patch, PatchParams}; use kube::{Api, ResourceExt}; -use n0_error::{Result, StdResultExt}; +use n0_error::Result; use n0_future::task::AbortOnDropHandle; use rand::Rng; use serde_json::json; @@ -23,6 +23,7 @@ use crate::datum_apis::connector::{ }; use crate::datum_apis::lease::Lease; use crate::datum_cloud::{DatumCloudClient, LoginState}; +use crate::project_control_plane::is_kube_auth_failure; type ProjectRunner = Arc< dyn Fn( @@ -261,7 +262,15 @@ async fn run_project( continue; } }; - let client = pcp.client(); + // Preflight refresh: ensure the kube client is built from a non-near-expiry token. + let client = match pcp.client_refreshed().await { + Ok(client) => client, + Err(err) => { + warn!(%project_id, "heartbeat: failed to refresh pcp client: {err:#}"); + sleep_with_cancel(backoff.next(), &cancel).await; + continue; + } + }; let connectors: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); let leases: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); @@ -295,6 +304,11 @@ async fn run_project( } Err(err) => { warn!(%project_id, "heartbeat: connector lookup failed: {err:#}"); + if is_kube_auth_failure(&err) { + warn!(%project_id, "heartbeat: kube auth failure; logging out"); + let _ = datum.auth().logout().await; + return; + } sleep_with_cancel(backoff.next(), &cancel).await; continue; } @@ -331,6 +345,11 @@ async fn run_project( connector = %cached.name, "heartbeat: failed to fetch connector: {err:#}" ); + if is_kube_auth_failure(&err) { + warn!(%project_id, "heartbeat: kube auth failure; logging out"); + let _ = datum.auth().logout().await; + return; + } cache = None; sleep_with_cancel(backoff.next(), &cancel).await; continue; @@ -364,17 +383,25 @@ async fn run_project( if cached.last_details.as_ref() != Some(&details_value) { let patch = json!({ "status": { "connectionDetails": details_value } }); - if let Err(err) = connectors + match connectors .patch_status(&cached.name, &PatchParams::default(), &Patch::Merge(&patch)) .await { - warn!( - %project_id, - connector = %cached.name, - "heartbeat: failed to patch connection details: {err:#}" - ); - } else { - cached.last_details = Some(patch["status"]["connectionDetails"].clone()); + Ok(_) => { + cached.last_details = Some(patch["status"]["connectionDetails"].clone()); + } + Err(err) => { + warn!( + %project_id, + connector = %cached.name, + "heartbeat: failed to patch connection details: {err:#}" + ); + if is_kube_auth_failure(&err) { + warn!(%project_id, "heartbeat: kube auth failure; logging out"); + let _ = datum.auth().logout().await; + return; + } + } } } @@ -397,6 +424,11 @@ async fn run_project( lease = %lease_name, "heartbeat: failed to fetch lease: {err:#}" ); + if is_kube_auth_failure(&err) { + warn!(%project_id, "heartbeat: kube auth failure; logging out"); + let _ = datum.auth().logout().await; + return; + } cache = Some(cached); sleep_with_cancel(backoff.next(), &cancel).await; continue; @@ -417,6 +449,11 @@ async fn run_project( .await { warn!(%project_id, lease = %lease_name, "heartbeat: lease renew failed: {err:#}"); + if is_kube_auth_failure(&err) { + warn!(%project_id, "heartbeat: kube auth failure; logging out"); + let _ = datum.auth().logout().await; + return; + } cache = Some(cached); sleep_with_cancel(backoff.next(), &cancel).await; continue; @@ -438,21 +475,27 @@ async fn probe_connector( provider: Arc, ) -> Result { let pcp = datum.project_control_plane_client(project_id).await?; - let client = pcp.client(); - let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); let selector = provider.endpoint_id(); - Ok(find_connector(&connectors, selector).await?.is_some()) + let result = pcp + .with_auth_retry(|client| { + let selector = selector.clone(); + async move { + let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + Ok(find_connector(&connectors, selector).await?.is_some()) + } + }) + .await?; + Ok(result) } async fn find_connector( connectors: &Api, endpoint_id: String, -) -> Result> { +) -> kube::Result> { let selector = format!("status.connectionDetails.publicKey.id={endpoint_id}"); let list = connectors .list(&ListParams::default().fields(&selector)) - .await - .std_context("failed to list connectors")?; + .await?; if list.items.is_empty() { return Ok(None); } diff --git a/lib/src/project_control_plane.rs b/lib/src/project_control_plane.rs index f0a70fb..7fa6ffa 100644 --- a/lib/src/project_control_plane.rs +++ b/lib/src/project_control_plane.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::sync::Arc; use arc_swap::ArcSwap; @@ -9,7 +10,7 @@ use n0_future::task::AbortOnDropHandle; use secrecy::SecretString; use tracing::warn; -use crate::datum_cloud::{DatumCloudClient, LoginState}; +use crate::datum_cloud::{DatumCloudClient, LoginState, Unauthorized}; use crate::http_user_agent::datum_http_user_agent; #[derive(derive_more::Debug, Clone)] @@ -67,6 +68,96 @@ impl ProjectControlPlaneClient { Ok(self.client()) } + /// Build a `kube::Client` after forcing a token refresh, even if the access token + /// is not near expiry. Used by [`Self::with_auth_retry`] when the server has just + /// rejected the current bearer. + async fn client_force_refreshed(&self) -> Result { + self.datum.auth().force_refresh().await?; + let auth_state = self.datum.auth().load(); + let auth = auth_state.get()?; + let access_token = auth.tokens.access_token.secret(); + self.rebuild_if_changed(access_token)?; + Ok(self.client()) + } + + /// Run a kube operation with auth handling (idempotent / read variant): + /// + /// 1. Preflight: refresh the access token if near expiry and rebuild the client. + /// 2. Run the operation. On 401/403, force a refresh and retry once. + /// 3. If the retry also returns 401/403, clear the local auth state and return + /// [`Unauthorized`]. The watch on `login_state` will then drive a redirect + /// to login in the UI. + /// + /// The closure is `Fn` so it can be invoked twice. Use this for idempotent + /// operations (lists, gets, deletes). For non-idempotent writes, use + /// [`Self::with_auth`] which preflights and logs out on auth failure without + /// retrying. + pub async fn with_auth_retry(&self, op: F) -> Result + where + F: Fn(Client) -> Fut, + Fut: Future>, + { + let client = self.client_refreshed().await?; + let first_err = match op(client).await { + Ok(val) => return Ok(val), + Err(err) => err, + }; + if !is_kube_auth_failure(&first_err) { + return Err(first_err).std_context("kube operation failed"); + } + + warn!( + err = %first_err, + "kube returned auth failure; attempting forced token refresh" + ); + let client = match self.client_force_refreshed().await { + Ok(client) => client, + Err(err) => { + warn!("Forced auth refresh failed: {err:#}"); + return Err(Unauthorized.into()); + } + }; + match op(client).await { + Ok(val) => Ok(val), + Err(err) if is_kube_auth_failure(&err) => { + warn!( + err = %err, + "kube auth failure persisted after refresh; logging out" + ); + if let Err(e) = self.datum.auth().logout().await { + warn!("Failed to clear auth state after persistent 401/403: {e:#}"); + } + Err(Unauthorized.into()) + } + Err(err) => Err(err).std_context("kube operation failed after refresh"), + } + } + + /// Run a kube operation with preflight refresh and auto-logout on auth failure, + /// but without retrying. Use for non-idempotent writes (create/patch/etc.) where + /// re-running the closure would produce secondary errors like "AlreadyExists". + /// + /// On 401/403 the local auth state is cleared and [`Unauthorized`] is returned; + /// the UI's login-state watcher then routes the user back to the login screen. + pub async fn with_auth(&self, op: F) -> Result + where + F: FnOnce(Client) -> Fut, + Fut: Future>, + { + let client = self.client_refreshed().await?; + match op(client).await { + Ok(val) => Ok(val), + Err(err) if is_kube_auth_failure(&err) => { + warn!(err = %err, "kube auth failure; logging out"); + if let Err(e) = self.datum.auth().logout().await { + warn!("Failed to clear auth state on 401/403: {e:#}"); + } + Err(Unauthorized.into()) + } + Err(err) => Err(err).std_context("kube operation failed"), + } + } + fn build_kube_client(server_url: &str, access_token: &str) -> Result { let uri = server_url .parse() @@ -135,3 +226,45 @@ impl ProjectControlPlaneClient { self._auth_task = Some(Arc::new(AbortOnDropHandle::new(task))); } } + +/// True if the kube error indicates the bearer token was rejected. +pub fn is_kube_auth_failure(err: &kube::Error) -> bool { + matches!(err, kube::Error::Api(resp) if resp.code == 401 || resp.code == 403) +} + +#[cfg(test)] +mod is_kube_auth_failure_tests { + use super::*; + use kube::core::ErrorResponse; + + fn api_err(code: u16) -> kube::Error { + kube::Error::Api(ErrorResponse { + status: "Failure".to_string(), + message: "denied".to_string(), + reason: "Unauthorized".to_string(), + code, + }) + } + + #[test] + fn classifies_401_and_403_api_errors() { + assert!(is_kube_auth_failure(&api_err(401))); + assert!(is_kube_auth_failure(&api_err(403))); + } + + #[test] + fn ignores_other_api_status_codes() { + for code in [200u16, 404, 409, 410, 500, 503] { + assert!( + !is_kube_auth_failure(&api_err(code)), + "code {code} should not be an auth failure" + ); + } + } + + #[test] + fn ignores_non_api_errors() { + let err = kube::Error::TlsRequired; + assert!(!is_kube_auth_failure(&err)); + } +} diff --git a/lib/src/tunnels.rs b/lib/src/tunnels.rs index 5dc92f3..4d18aa8 100644 --- a/lib/src/tunnels.rs +++ b/lib/src/tunnels.rs @@ -186,28 +186,30 @@ impl TunnelService { let connector_name = connector.name_any(); let pcp = self.datum.project_control_plane_client(project_id).await?; - let client = pcp.client(); - let proxies: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - let ads: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); - - let proxy_list = proxies - .list(&ListParams::default()) - .await - .std_context("Failed to list HTTPProxy objects")?; - let ad_selector = format!("{ADVERTISEMENT_CONNECTOR_FIELD}={connector_name}"); - let ad_list = ads - .list(&ListParams::default().fields(&ad_selector)) - .await - .std_context("Failed to list ConnectorAdvertisement objects")?; - let enabled_by_name: HashMap = ad_list - .items + let (proxy_items, ad_items) = pcp + .with_auth_retry(|client| { + let ad_selector = ad_selector.clone(); + async move { + let proxies: Api = + Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + let ads: Api = + Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + let proxy_list = proxies.list(&ListParams::default()).await?; + let ad_list = ads + .list(&ListParams::default().fields(&ad_selector)) + .await?; + Ok((proxy_list.items, ad_list.items)) + } + }) + .await?; + let enabled_by_name: HashMap = ad_items .into_iter() .filter_map(|item| item.metadata.name.clone().map(|name| (name, item))) .collect(); let mut tunnels = Vec::new(); - for proxy in proxy_list.items { + for proxy in proxy_items { let Some(name) = proxy.metadata.name.clone() else { continue; }; @@ -312,146 +314,155 @@ impl TunnelService { let connector_name = connector.name_any(); let pcp = self.datum.project_control_plane_client(project_id).await?; - let client = pcp.client(); - let proxies: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - let ads: Api = - Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - - debug!( - %project_id, - connector = %connector_name, - endpoint = %endpoint, - "creating HTTPProxy" - ); - let mut proxy = HTTPProxy { - metadata: ObjectMeta { - generate_name: Some("tunnel-".to_string()), - annotations: Some(BTreeMap::from([( - DISPLAY_NAME_ANNOTATION.to_string(), - label.to_string(), - )])), - ..Default::default() - }, - spec: HTTPProxySpec { - hostnames: None, - rules: vec![ - https_redirect_rule(), - proxy_rule(&endpoint, &connector_name), - ], - }, - status: None, - }; - proxy = proxies - .create(&PostParams::default(), &proxy) - .await - .std_context("Failed to create HTTPProxy") - .inspect_err(|err| { - warn!( - %project_id, - connector = %connector_name, - endpoint = %endpoint, - "HTTPProxy create failed: {err:#}" + let create_traffic_protection_policies = self.create_traffic_protection_policies; + let project_id_owned = project_id.to_string(); + let connector_name_owned = connector_name.clone(); + let label_owned = label.to_string(); + let endpoint_owned = endpoint.clone(); + let proxy = pcp + .with_auth(|client| async move { + let proxies: Api = + Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + let ads: Api = + Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + + debug!( + project_id = %project_id_owned, + connector = %connector_name_owned, + endpoint = %endpoint_owned, + "creating HTTPProxy" ); - })?; - let proxy_name = proxy.name_any(); - debug!( - %project_id, - proxy = %proxy_name, - connector = %connector_name, - "created HTTPProxy" - ); - - let ad_spec = advertisement_spec(&connector_name, target); - debug!( - %project_id, - proxy = %proxy_name, - connector = %connector_name, - "creating ConnectorAdvertisement" - ); - let ad = ConnectorAdvertisement { - metadata: ObjectMeta { - name: Some(proxy_name.clone()), - ..Default::default() - }, - spec: ad_spec, - status: None, - }; - ads.create(&PostParams::default(), &ad) - .await - .std_context("Failed to create ConnectorAdvertisement") - .inspect_err(|err| { - warn!( - %project_id, + let proxy = HTTPProxy { + metadata: ObjectMeta { + generate_name: Some("tunnel-".to_string()), + annotations: Some(BTreeMap::from([( + DISPLAY_NAME_ANNOTATION.to_string(), + label_owned.clone(), + )])), + ..Default::default() + }, + spec: HTTPProxySpec { + hostnames: None, + rules: vec![ + https_redirect_rule(), + proxy_rule(&endpoint_owned, &connector_name_owned), + ], + }, + status: None, + }; + let proxy = proxies + .create(&PostParams::default(), &proxy) + .await + .inspect_err(|err| { + warn!( + project_id = %project_id_owned, + connector = %connector_name_owned, + endpoint = %endpoint_owned, + "HTTPProxy create failed: {err:#}" + ); + })?; + let proxy_name = proxy.name_any(); + debug!( + project_id = %project_id_owned, proxy = %proxy_name, - connector = %connector_name, - "ConnectorAdvertisement create failed: {err:#}" + connector = %connector_name_owned, + "created HTTPProxy" ); - })?; - debug!( - %project_id, - proxy = %proxy_name, - connector = %connector_name, - "created ConnectorAdvertisement" - ); - - if self.create_traffic_protection_policies { - let tpps: Api = - Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - debug!( - %project_id, - proxy = %proxy_name, - "creating TrafficProtectionPolicy" - ); - let tpp = TrafficProtectionPolicy { - metadata: ObjectMeta { - name: Some(proxy_name.clone()), - ..Default::default() - }, - spec: TrafficProtectionPolicySpec { - target_refs: vec![LocalPolicyTargetReferenceWithSectionName { - group: "gateway.networking.k8s.io".to_string(), - kind: "Gateway".to_string(), - name: proxy_name.clone(), - section_name: None, - }], - mode: Some(TrafficProtectionPolicyMode::Enforce), - sampling_percentage: None, - rule_sets: Some(vec![TrafficProtectionPolicyRuleSet { - rule_set_type: TrafficProtectionPolicyRuleSetType::OWASPCoreRuleSet, - owasp_core_rule_set: Some(OWASPCRS { - paranoia_levels: Some(ParanoiaLevels { - blocking: Some(1), - detection: Some(1), - }), - score_thresholds: None, - rule_exclusions: None, - }), - }]), - }, - status: None, - }; - tpps.create(&PostParams::default(), &tpp) - .await - .std_context("Failed to create TrafficProtectionPolicy") - .inspect_err(|err| { - warn!( - %project_id, + + let ad_spec = advertisement_spec(&connector_name_owned, target); + debug!( + project_id = %project_id_owned, + proxy = %proxy_name, + connector = %connector_name_owned, + "creating ConnectorAdvertisement" + ); + let ad = ConnectorAdvertisement { + metadata: ObjectMeta { + name: Some(proxy_name.clone()), + ..Default::default() + }, + spec: ad_spec, + status: None, + }; + ads.create(&PostParams::default(), &ad) + .await + .inspect_err(|err| { + warn!( + project_id = %project_id_owned, + proxy = %proxy_name, + connector = %connector_name_owned, + "ConnectorAdvertisement create failed: {err:#}" + ); + })?; + debug!( + project_id = %project_id_owned, + proxy = %proxy_name, + connector = %connector_name_owned, + "created ConnectorAdvertisement" + ); + + if create_traffic_protection_policies { + let tpps: Api = + Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + debug!( + project_id = %project_id_owned, proxy = %proxy_name, - "TrafficProtectionPolicy create failed: {err:#}" + "creating TrafficProtectionPolicy" ); - })?; - debug!( - %project_id, - proxy = %proxy_name, - "created TrafficProtectionPolicy" - ); - } else { - debug!( - %project_id, - proxy = %proxy_name, - "skipping TrafficProtectionPolicy creation (env disabled)" - ); - } + let tpp = TrafficProtectionPolicy { + metadata: ObjectMeta { + name: Some(proxy_name.clone()), + ..Default::default() + }, + spec: TrafficProtectionPolicySpec { + target_refs: vec![LocalPolicyTargetReferenceWithSectionName { + group: "gateway.networking.k8s.io".to_string(), + kind: "Gateway".to_string(), + name: proxy_name.clone(), + section_name: None, + }], + mode: Some(TrafficProtectionPolicyMode::Enforce), + sampling_percentage: None, + rule_sets: Some(vec![TrafficProtectionPolicyRuleSet { + rule_set_type: TrafficProtectionPolicyRuleSetType::OWASPCoreRuleSet, + owasp_core_rule_set: Some(OWASPCRS { + paranoia_levels: Some(ParanoiaLevels { + blocking: Some(1), + detection: Some(1), + }), + score_thresholds: None, + rule_exclusions: None, + }), + }]), + }, + status: None, + }; + tpps.create(&PostParams::default(), &tpp) + .await + .inspect_err(|err| { + warn!( + project_id = %project_id_owned, + proxy = %proxy_name, + "TrafficProtectionPolicy create failed: {err:#}" + ); + })?; + debug!( + project_id = %project_id_owned, + proxy = %proxy_name, + "created TrafficProtectionPolicy" + ); + } else { + debug!( + project_id = %project_id_owned, + proxy = %proxy_name, + "skipping TrafficProtectionPolicy creation (env disabled)" + ); + } + + Ok(proxy) + }) + .await?; + let proxy_name = proxy.name_any(); let proxy_state = proxy_state_from_summary(&proxy_name, &endpoint, label, true)?; if self.publish_tickets { @@ -499,48 +510,51 @@ impl TunnelService { let connector_name = connector.name_any(); let pcp = self.datum.project_control_plane_client(project_id).await?; - let client = pcp.client(); - let proxies: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - let ads: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); - - let existing = proxies - .get(tunnel_id) - .await - .std_context("Failed to fetch HTTPProxy")?; - let hostnames = existing.spec.hostnames.clone().unwrap_or_default(); - - let patch = json!({ - "metadata": { - "annotations": { - DISPLAY_NAME_ANNOTATION: label, + let tunnel_id_owned = tunnel_id.to_string(); + let label_owned = label.to_string(); + let endpoint_owned = endpoint.clone(); + let connector_name_owned = connector_name.clone(); + let (existing, enabled) = pcp + .with_auth(|client| async move { + let proxies: Api = + Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + let ads: Api = + Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + + let existing = proxies.get(&tunnel_id_owned).await?; + let hostnames = existing.spec.hostnames.clone().unwrap_or_default(); + + let patch = json!({ + "metadata": { + "annotations": { + DISPLAY_NAME_ANNOTATION: label_owned, + } + }, + "spec": { + "hostnames": hostnames, + "rules": [https_redirect_rule(), proxy_rule(&endpoint_owned, &connector_name_owned)], + } + }); + proxies + .patch(&tunnel_id_owned, &PatchParams::default(), &Patch::Merge(&patch)) + .await?; + + if let Some(_existing_ad) = ads.get_opt(&tunnel_id_owned).await? { + let ad_patch = json!({ + "spec": advertisement_spec(&connector_name_owned, target) + }); + ads.patch( + &tunnel_id_owned, + &PatchParams::default(), + &Patch::Merge(&ad_patch), + ) + .await?; } - }, - "spec": { - "hostnames": hostnames, - "rules": [https_redirect_rule(), proxy_rule(&endpoint, &connector_name)], - } - }); - proxies - .patch(tunnel_id, &PatchParams::default(), &Patch::Merge(&patch)) - .await - .std_context("Failed to update HTTPProxy")?; - - if let Ok(existing_ad) = ads.get_opt(tunnel_id).await - && existing_ad.is_some() - { - let ad_patch = json!({ - "spec": advertisement_spec(&connector_name, target) - }); - ads.patch(tunnel_id, &PatchParams::default(), &Patch::Merge(&ad_patch)) - .await - .std_context("Failed to update ConnectorAdvertisement")?; - } - let enabled = ads - .get_opt(tunnel_id) - .await - .std_context("Failed to load ConnectorAdvertisement")? - .is_some(); + let enabled = ads.get_opt(&tunnel_id_owned).await?.is_some(); + Ok((existing, enabled)) + }) + .await?; let summary = TunnelSummary { id: tunnel_id.to_string(), @@ -589,14 +603,19 @@ impl TunnelService { let connector_name = connector.name_any(); let pcp = self.datum.project_control_plane_client(project_id).await?; - let client = pcp.client(); - let proxies: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - let ads: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); - let proxy = proxies - .get(tunnel_id) - .await - .std_context("Failed to fetch HTTPProxy")?; + // Fetch the existing proxy first so we can derive endpoint/label up front and + // do the (fallible, non-kube) target parsing outside the auth-handling closure. + let tunnel_id_for_get = tunnel_id.to_string(); + let proxy = pcp + .with_auth_retry(move |client| { + let tunnel_id = tunnel_id_for_get.clone(); + async move { + let proxies: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + proxies.get(&tunnel_id).await + } + }) + .await?; let endpoint = normalize_endpoint(&proxy_backend_endpoint(&proxy).unwrap_or_default()); let label = proxy .metadata @@ -606,44 +625,54 @@ impl TunnelService { .cloned() .unwrap_or_else(|| tunnel_id.to_string()); - if enabled { - let target = parse_target(&endpoint)?; - let ad_spec = advertisement_spec(&connector_name, target); - match ads - .get_opt(tunnel_id) - .await - .std_context("Failed to load ConnectorAdvertisement")? - { - Some(_) => { - let ad_patch = json!({ "spec": ad_spec }); - ads.patch(tunnel_id, &PatchParams::default(), &Patch::Merge(&ad_patch)) - .await - .std_context("Failed to update ConnectorAdvertisement")?; - } - None => { - let ad = ConnectorAdvertisement { - metadata: ObjectMeta { - name: Some(tunnel_id.to_string()), - ..Default::default() - }, - spec: ad_spec, - status: None, - }; - ads.create(&PostParams::default(), &ad) - .await - .std_context("Failed to create ConnectorAdvertisement")?; + let tunnel_id_owned = tunnel_id.to_string(); + let connector_name_owned = connector_name.clone(); + let endpoint_for_closure = endpoint.clone(); + pcp.with_auth(|client| async move { + let ads: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + if enabled { + // parse_target is infallible at this point: we already used `endpoint` for + // proxy_backend_endpoint, and we re-validate below for safety. + let target = match parse_target(&endpoint_for_closure) { + Ok(t) => t, + Err(err) => { + // Surface as a kube Service error so it round-trips through the + // helper; the caller still sees a wrapped n0_error. + return Err(kube::Error::Service(Box::new(std::io::Error::other( + err.to_string(), + )))); + } + }; + let ad_spec = advertisement_spec(&connector_name_owned, target); + match ads.get_opt(&tunnel_id_owned).await? { + Some(_) => { + let ad_patch = json!({ "spec": ad_spec }); + ads.patch( + &tunnel_id_owned, + &PatchParams::default(), + &Patch::Merge(&ad_patch), + ) + .await?; + } + None => { + let ad = ConnectorAdvertisement { + metadata: ObjectMeta { + name: Some(tunnel_id_owned.clone()), + ..Default::default() + }, + spec: ad_spec, + status: None, + }; + ads.create(&PostParams::default(), &ad).await?; + } } + } else if ads.get_opt(&tunnel_id_owned).await?.is_some() { + ads.delete(&tunnel_id_owned, &DeleteParams::default()) + .await?; } - } else if ads - .get_opt(tunnel_id) - .await - .std_context("Failed to load ConnectorAdvertisement")? - .is_some() - { - ads.delete(tunnel_id, &DeleteParams::default()) - .await - .std_context("Failed to delete ConnectorAdvertisement")?; - } + Ok(()) + }) + .await?; let summary = TunnelSummary { id: tunnel_id.to_string(), @@ -697,47 +726,68 @@ impl TunnelService { let connector_name = connector.name_any(); let pcp = self.datum.project_control_plane_client(project_id).await?; - let client = pcp.client(); - let proxies: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - let ads: Api = - Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - let connectors: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - - if proxies - .get_opt(tunnel_id) - .await - .std_context("Failed to load HTTPProxy")? - .is_some() - { - proxies - .delete(tunnel_id, &DeleteParams::default()) - .await - .std_context("Failed to delete HTTPProxy")?; - } - if ads - .get_opt(tunnel_id) - .await - .std_context("Failed to load ConnectorAdvertisement")? - .is_some() - { - ads.delete(tunnel_id, &DeleteParams::default()) - .await - .std_context("Failed to delete ConnectorAdvertisement")?; - } + let tunnel_id_owned = tunnel_id.to_string(); + let connector_name_owned = connector_name.clone(); + let connector_deleted = pcp + .with_auth(|client| async move { + let proxies: Api = + Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + let ads: Api = + Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + let connectors: Api = + Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + + if proxies.get_opt(&tunnel_id_owned).await?.is_some() { + proxies + .delete(&tunnel_id_owned, &DeleteParams::default()) + .await?; + } - let tpps: Api = - Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); - if tpps - .get_opt(tunnel_id) - .await - .std_context("Failed to load TrafficProtectionPolicy")? - .is_some() - { - tpps.delete(tunnel_id, &DeleteParams::default()) - .await - .std_context("Failed to delete TrafficProtectionPolicy")?; - } + if ads.get_opt(&tunnel_id_owned).await?.is_some() { + ads.delete(&tunnel_id_owned, &DeleteParams::default()) + .await?; + } + + let tpps: Api = + Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + if tpps.get_opt(&tunnel_id_owned).await?.is_some() { + tpps.delete(&tunnel_id_owned, &DeleteParams::default()) + .await?; + } + + let remaining = proxies.list(&ListParams::default()).await?; + let mut connector_deleted = false; + let mut remaining_for_connector = remaining + .items + .into_iter() + .filter(|proxy| proxy_uses_connector(proxy, &connector_name_owned)) + .peekable(); + if remaining_for_connector.peek().is_none() { + let ad_selector = + format!("{ADVERTISEMENT_CONNECTOR_FIELD}={connector_name_owned}"); + let ads_list = ads + .list(&ListParams::default().fields(&ad_selector)) + .await?; + for ad in ads_list.items { + if let Some(name) = ad.metadata.name.clone() + && let Err(err) = ads.delete(&name, &DeleteParams::default()).await + { + warn!(%name, "Failed to delete connector advertisement: {err:#}"); + } + } + + if connectors.get_opt(&connector_name_owned).await?.is_some() { + connectors + .delete(&connector_name_owned, &DeleteParams::default()) + .await?; + connector_deleted = true; + } + } + + Ok(connector_deleted) + }) + .await?; if self.publish_tickets { debug!(%tunnel_id, "unpublishing ticket for tunnel"); @@ -748,44 +798,6 @@ impl TunnelService { warn!(%tunnel_id, "Failed to remove proxy state: {err:#}"); } - let remaining = proxies - .list(&ListParams::default()) - .await - .std_context("Failed to list remaining HTTPProxy objects")?; - let mut connector_deleted = false; - let mut remaining_for_connector = remaining - .items - .into_iter() - .filter(|proxy| proxy_uses_connector(proxy, &connector_name)) - .peekable(); - if remaining_for_connector.peek().is_none() { - let ad_selector = format!("{ADVERTISEMENT_CONNECTOR_FIELD}={connector_name}"); - let ads_list = ads - .list(&ListParams::default().fields(&ad_selector)) - .await - .std_context("Failed to list remaining ConnectorAdvertisements")?; - for ad in ads_list.items { - if let Some(name) = ad.metadata.name.clone() - && let Err(err) = ads.delete(&name, &DeleteParams::default()).await - { - warn!(%name, "Failed to delete connector advertisement: {err:#}"); - } - } - - if connectors - .get_opt(&connector_name) - .await - .std_context("Failed to load Connector")? - .is_some() - { - connectors - .delete(&connector_name, &DeleteParams::default()) - .await - .std_context("Failed to delete Connector")?; - connector_deleted = true; - } - } - Ok(TunnelDeleteOutcome { project_id: project_id.to_string(), connector_deleted, @@ -794,73 +806,91 @@ impl TunnelService { async fn find_connector(&self, project_id: &str) -> Result> { let pcp = self.datum.project_control_plane_client(project_id).await?; - let client = pcp.client(); - let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); let endpoint_id = self.listen.endpoint_id().to_string(); let selector = format!("{CONNECTOR_SELECTOR_FIELD}={endpoint_id}"); - let list = connectors - .list(&ListParams::default().fields(&selector)) - .await - .std_context("Failed to list connectors")?; - if list.items.is_empty() { - let fallback = connectors - .list(&ListParams::default()) - .await - .std_context("Failed to list connectors for fallback")?; - if fallback.items.len() != 1 { - if !fallback.items.is_empty() { - warn!( - %project_id, - count = fallback.items.len(), - "Multiple connectors found without status match" - ); - } - return Ok(None); - } - let mut connector = fallback.items.into_iter().next().unwrap(); - let needs_patch = connector - .status - .as_ref() - .and_then(|status| status.connection_details.as_ref()) - .and_then(|details| details.public_key.as_ref()) - .map(|details| details.id.as_str() != endpoint_id.as_str()) - .unwrap_or(true); - if needs_patch && let Some(details) = build_connection_details(&self.listen) { - let details_value = serde_json::to_value(details) - .std_context("Failed to serialize connection details")?; - let patch = json!({ "status": { "connectionDetails": details_value } }); - if let Err(err) = connectors - .patch_status( - &connector.name_any(), - &PatchParams::default(), - &Patch::Merge(&patch), + let project_id_owned = project_id.to_string(); + + let connection_details = build_connection_details(&self.listen); + let device_annotations_value = device_annotations(); + + let connector = pcp + .with_auth_retry(|client| { + let selector = selector.clone(); + let endpoint_id = endpoint_id.clone(); + let project_id = project_id_owned.clone(); + let connection_details = connection_details.clone(); + let device_annotations_value = device_annotations_value.clone(); + async move { + let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + let list = connectors + .list(&ListParams::default().fields(&selector)) + .await?; + let mut connector = if list.items.is_empty() { + let fallback = connectors.list(&ListParams::default()).await?; + if fallback.items.len() != 1 { + if !fallback.items.is_empty() { + warn!( + %project_id, + count = fallback.items.len(), + "Multiple connectors found without status match" + ); + } + return Ok(None); + } + let mut connector = fallback.items.into_iter().next().unwrap(); + let needs_patch = connector + .status + .as_ref() + .and_then(|status| status.connection_details.as_ref()) + .and_then(|details| details.public_key.as_ref()) + .map(|details| details.id.as_str() != endpoint_id.as_str()) + .unwrap_or(true); + if needs_patch && let Some(details) = connection_details.as_ref() { + let details_value = match serde_json::to_value(details) { + Ok(v) => v, + Err(err) => { + return Err(kube::Error::SerdeError(err)); + } + }; + let patch = json!({ "status": { "connectionDetails": details_value } }); + if let Err(err) = connectors + .patch_status( + &connector.name_any(), + &PatchParams::default(), + &Patch::Merge(&patch), + ) + .await + { + warn!( + connector = %connector.name_any(), + "Failed to patch connector status: {err:#}" + ); + } else { + connector = connectors.get(&connector.name_any()).await?; + } + } + connector + } else { + if list.items.len() > 1 { + debug!( + %selector, + count = list.items.len(), + "Multiple connectors found for endpoint, using first" + ); + } + list.items.into_iter().next().unwrap() + }; + patch_device_annotations( + &connectors, + &mut connector, + &device_annotations_value, ) - .await - { - warn!( - connector = %connector.name_any(), - "Failed to patch connector status: {err:#}" - ); - } else { - connector = connectors - .get(&connector.name_any()) - .await - .std_context("Failed to reload connector after patch")?; + .await; + Ok(Some(connector)) } - } - patch_device_annotations(&connectors, &mut connector).await; - return Ok(Some(connector)); - } - if list.items.len() > 1 { - debug!( - %selector, - count = list.items.len(), - "Multiple connectors found for endpoint, using first" - ); - } - let mut connector = list.items.into_iter().next().unwrap(); - patch_device_annotations(&connectors, &mut connector).await; - Ok(Some(connector)) + }) + .await?; + Ok(connector) } async fn ensure_connector(&self, project_id: &str) -> Result { @@ -869,43 +899,56 @@ impl TunnelService { } let pcp = self.datum.project_control_plane_client(project_id).await?; - let client = pcp.client(); - let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); - - let mut connector = Connector { - metadata: ObjectMeta { - generate_name: Some("datum-connect-".to_string()), - annotations: Some(device_annotations()), - ..Default::default() - }, - spec: ConnectorSpec { - connector_class_name: DEFAULT_CONNECTOR_CLASS_NAME.to_string(), - capabilities: None, - }, - status: None, - }; - connector = connectors - .create(&PostParams::default(), &connector) - .await - .std_context("Failed to create Connector")?; - - if let Some(details) = build_connection_details(&self.listen) { - let details_value = serde_json::to_value(details) - .std_context("Failed to serialize connection details")?; - let patch = json!({ "status": { "connectionDetails": details_value } }); - if let Err(err) = connectors - .patch_status( - &connector.name_any(), - &PatchParams::default(), - &Patch::Merge(&patch), - ) - .await - { - warn!(connector = %connector.name_any(), "Failed to patch connector status: {err:#}"); - } - } else { - warn!(connector = %connector.name_any(), "Missing connection details for connector status"); - } + let connection_details = build_connection_details(&self.listen); + let device_annotations_value = device_annotations(); + + let connector = pcp + .with_auth(|client| async move { + let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + + let connector = Connector { + metadata: ObjectMeta { + generate_name: Some("datum-connect-".to_string()), + annotations: Some(device_annotations_value), + ..Default::default() + }, + spec: ConnectorSpec { + connector_class_name: DEFAULT_CONNECTOR_CLASS_NAME.to_string(), + capabilities: None, + }, + status: None, + }; + let connector = connectors + .create(&PostParams::default(), &connector) + .await?; + + if let Some(details) = connection_details.as_ref() { + let details_value = + serde_json::to_value(details).map_err(kube::Error::SerdeError)?; + let patch = json!({ "status": { "connectionDetails": details_value } }); + if let Err(err) = connectors + .patch_status( + &connector.name_any(), + &PatchParams::default(), + &Patch::Merge(&patch), + ) + .await + { + warn!( + connector = %connector.name_any(), + "Failed to patch connector status: {err:#}" + ); + } + } else { + warn!( + connector = %connector.name_any(), + "Missing connection details for connector status" + ); + } + + Ok(connector) + }) + .await?; Ok(connector) } @@ -1129,8 +1172,11 @@ fn device_annotations() -> BTreeMap { ]) } -async fn patch_device_annotations(api: &Api, connector: &mut Connector) { - let expected = device_annotations(); +async fn patch_device_annotations( + api: &Api, + connector: &mut Connector, + expected: &BTreeMap, +) { let current = connector.metadata.annotations.as_ref(); let needs_patch = expected.iter().any(|(k, v)| { current diff --git a/ui/assets/tailwind.css b/ui/assets/tailwind.css index 1dc5feb..05f175f 100644 --- a/ui/assets/tailwind.css +++ b/ui/assets/tailwind.css @@ -29,6 +29,7 @@ --font-weight-medium: 500; --font-weight-semibold: 600; --font-weight-bold: 700; + --leading-relaxed: 1.625; --radius-md: 0.375rem; --radius-xl: 0.75rem; --radius-2xl: 1rem; @@ -357,6 +358,9 @@ .block { display: block; } + .contents { + display: contents; + } .flex { display: flex; } @@ -696,6 +700,9 @@ .overflow-y-auto { overflow-y: auto; } + .rounded { + border-radius: var(--radius-md); + } .rounded-2xl { border-radius: var(--radius-2xl); } @@ -1016,6 +1023,10 @@ --tw-leading: 1; line-height: 1; } + .leading-relaxed { + --tw-leading: var(--leading-relaxed); + line-height: var(--leading-relaxed); + } .font-bold { --tw-font-weight: var(--font-weight-bold); font-weight: var(--font-weight-bold); @@ -1175,6 +1186,10 @@ -webkit-backdrop-filter: var(--tw-backdrop-blur,) var(--tw-backdrop-brightness,) var(--tw-backdrop-contrast,) var(--tw-backdrop-grayscale,) var(--tw-backdrop-hue-rotate,) var(--tw-backdrop-invert,) var(--tw-backdrop-opacity,) var(--tw-backdrop-saturate,) var(--tw-backdrop-sepia,); backdrop-filter: var(--tw-backdrop-blur,) var(--tw-backdrop-brightness,) var(--tw-backdrop-contrast,) var(--tw-backdrop-grayscale,) var(--tw-backdrop-hue-rotate,) var(--tw-backdrop-invert,) var(--tw-backdrop-opacity,) var(--tw-backdrop-saturate,) var(--tw-backdrop-sepia,); } + .backdrop-filter { + -webkit-backdrop-filter: var(--tw-backdrop-blur,) var(--tw-backdrop-brightness,) var(--tw-backdrop-contrast,) var(--tw-backdrop-grayscale,) var(--tw-backdrop-hue-rotate,) var(--tw-backdrop-invert,) var(--tw-backdrop-opacity,) var(--tw-backdrop-saturate,) var(--tw-backdrop-sepia,); + backdrop-filter: var(--tw-backdrop-blur,) var(--tw-backdrop-brightness,) var(--tw-backdrop-contrast,) var(--tw-backdrop-grayscale,) var(--tw-backdrop-hue-rotate,) var(--tw-backdrop-invert,) var(--tw-backdrop-opacity,) var(--tw-backdrop-saturate,) var(--tw-backdrop-sepia,); + } .transition-colors { transition-property: color, background-color, border-color, outline-color, text-decoration-color, fill, stroke, --tw-gradient-from, --tw-gradient-via, --tw-gradient-to; transition-timing-function: var(--tw-ease, var(--default-transition-timing-function)); diff --git a/ui/src/state.rs b/ui/src/state.rs index b0da2d8..4a095b1 100644 --- a/ui/src/state.rs +++ b/ui/src/state.rs @@ -3,6 +3,7 @@ use lib::{ datum_cloud::{ApiEnv, DatumCloudClient}, HeartbeatAgent, ListenNode, Node, Repo, SelectedContext, TunnelService, TunnelSummary, }; +use std::collections::HashSet; use tokio::sync::Notify; use tracing::info; @@ -13,6 +14,13 @@ pub struct AppState { heartbeat: HeartbeatAgent, tunnel_refresh: std::sync::Arc, tunnel_cache: dioxus::signals::Signal>, + /// IDs of tunnels we've just deleted locally but whose backend resources + /// (HTTPProxy + ConnectorAdvertisement + …) may still appear in the next + /// few `list_active` polls while Kubernetes is reaping them. Tombstones + /// suppress the UI from showing a half-deleted tunnel with the toggle + /// flipped off; they are cleared automatically by `proxies_list` once the + /// API stops returning the ID. + pending_deletions: dioxus::signals::Signal>, } impl AppState { @@ -32,6 +40,7 @@ impl AppState { heartbeat, tunnel_refresh: std::sync::Arc::new(Notify::new()), tunnel_cache: dioxus::signals::Signal::new(Vec::new()), + pending_deletions: dioxus::signals::Signal::new(HashSet::new()), }; Ok(app_state) } @@ -91,6 +100,36 @@ impl AppState { cache.set(list); } + pub fn pending_deletions(&self) -> dioxus::signals::Signal> { + self.pending_deletions + } + + /// Mark a tunnel as deleted locally. Subsequent `list_active` polls will + /// suppress this ID from the merged tunnel list until the API stops + /// returning it (handled by `proxies_list`). + pub fn add_pending_deletion(&self, tunnel_id: String) { + let mut signal = self.pending_deletions; + let mut set = signal(); + set.insert(tunnel_id); + signal.set(set); + } + + /// Drop tombstones for IDs the backend no longer reports — typically + /// called from the poll loop with the latest API-returned IDs so a + /// re-created tunnel with the same name can reappear later. + pub fn reconcile_pending_deletions(&self, api_ids: &HashSet) { + let mut signal = self.pending_deletions; + let set = signal(); + let next: HashSet = set + .iter() + .filter(|id| api_ids.contains(*id)) + .cloned() + .collect(); + if next.len() != set.len() { + signal.set(next); + } + } + pub fn selected_context(&self) -> Option { self.datum.selected_context() } diff --git a/ui/src/views/proxies_list.rs b/ui/src/views/proxies_list.rs index 79c0ce7..6857e11 100644 --- a/ui/src/views/proxies_list.rs +++ b/ui/src/views/proxies_list.rs @@ -50,33 +50,67 @@ pub fn ProxiesList() -> Element { let tunnels = state.tunnel_cache(); // Check if we already have cached data - if so, we're already "loaded" let has_loaded = use_signal(|| !tunnels().is_empty()); + let load_error = use_signal(|| None::); let state_for_future = state.clone(); use_future(move || { let state_for_future = state_for_future.clone(); let mut has_loaded_for_future = has_loaded; + let mut load_error_for_future = load_error; async move { let mut ctx_rx = state_for_future.datum().selected_context_watch(); let refresh = state_for_future.tunnel_refresh(); loop { - let list = state_for_future - .tunnel_service() - .list_active() - .await - .unwrap_or_default(); - // Merge with current cache so we don't briefly show all tunnels as pending when - // a single tunnel is updated (backend may return stale/reconciling state). - let cache_signal = state_for_future.tunnel_cache(); - let current = cache_signal(); - let list = merge_tunnel_list_with_cache(current, list); - // Check if any tunnel is missing a hostname or not yet accepted/programmed. - // If so, poll more frequently. - // TODO(zachsmith1): When pending, poll only the specific HTTPProxy - // resource(s) instead of listing all tunnels each cycle. - let has_pending_hostname = list.iter().any(|t| t.hostnames.is_empty()); - let has_pending_status = list.iter().any(|t| !t.accepted || !t.programmed); - state_for_future.set_tunnel_cache(list); + // Don't overwrite the cache with an empty list on failure; keep the last good + // data so the UI doesn't briefly clear while we report the error. On auth + // failures the lib layer calls `auth.logout()` for us, which Chrome observes + // and uses to redirect to the login screen. + let (list, has_pending_hostname, has_pending_status) = + match state_for_future.tunnel_service().list_active().await { + Ok(list) => { + // Drop tunnels we just deleted locally that the backend hasn't + // finished reaping yet. Without this, a still-terminating + // HTTPProxy whose ConnectorAdvertisement is already gone would + // reappear in the UI with `enabled = false` (toggle "flips off") + // until the proxy is fully removed, forcing the user to click + // Delete a second time. + let api_ids: std::collections::HashSet = + list.iter().map(|t| t.id.clone()).collect(); + state_for_future.reconcile_pending_deletions(&api_ids); + let pending = state_for_future.pending_deletions()(); + let list: Vec<_> = list + .into_iter() + .filter(|t| !pending.contains(&t.id)) + .collect(); + + // Merge with current cache so we don't briefly show all tunnels as + // pending when a single tunnel is updated (backend may return + // stale/reconciling state). + let cache_signal = state_for_future.tunnel_cache(); + let current = cache_signal(); + let list = merge_tunnel_list_with_cache(current, list); + // TODO(zachsmith1): When pending, poll only the specific HTTPProxy + // resource(s) instead of listing all tunnels each cycle. + let has_pending_hostname = list.iter().any(|t| t.hostnames.is_empty()); + let has_pending_status = + list.iter().any(|t| !t.accepted || !t.programmed); + state_for_future.set_tunnel_cache(list.clone()); + load_error_for_future.set(None); + (list, has_pending_hostname, has_pending_status) + } + Err(err) => { + tracing::warn!("failed to list tunnels: {err:#}"); + load_error_for_future.set(Some(err.to_string())); + let cache_signal = state_for_future.tunnel_cache(); + let list = cache_signal(); + let has_pending_hostname = list.iter().any(|t| t.hostnames.is_empty()); + let has_pending_status = + list.iter().any(|t| !t.accepted || !t.programmed); + (list, has_pending_hostname, has_pending_status) + } + }; has_loaded_for_future.set(true); + let _ = list; if has_pending_hostname || has_pending_status { // Poll every 3 seconds when waiting for hostname provisioning @@ -123,6 +157,11 @@ pub fn ProxiesList() -> Element { .deregister_project(&outcome.project_id) .await; } + // Tombstone the ID so the next list_active poll doesn't resurrect + // the tunnel with `enabled = false` while Kubernetes is still + // reaping the HTTPProxy. The poll loop clears the tombstone once + // the API confirms the resource is gone. + state.add_pending_deletion(tunnel.id.clone()); state.remove_tunnel(&tunnel.id); state.bump_tunnel_refresh(); n0_error::Ok(()) @@ -283,7 +322,15 @@ pub fn ProxiesList() -> Element { }; rsx! { - div { class: "max-w-5xl mx-auto", {list} } + div { class: "max-w-5xl mx-auto", + if let Some(err) = load_error() { + div { class: "mb-4 rounded-lg border border-red-200 bg-red-50 px-4 py-3 text-alert-red-dark", + div { class: "text-sm font-semibold", "Failed to load tunnels" } + div { class: "text-xs mt-1 break-words", "{err}" } + } + } + {list} + } AddTunnelDialog { open: dialog_open, on_open_change: move |open| {