From c54eb4a15c9bd9a319e2be2637bee44bfabf6cb3 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Fri, 6 Mar 2026 12:13:23 +0100 Subject: [PATCH 01/18] vars.h5 attr fix --- R/artifacts.R | 20 +++++++++++--------- R/cytetype.R | 5 ++++- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/R/artifacts.R b/R/artifacts.R index 47eeb2e..9130de0 100644 --- a/R/artifacts.R +++ b/R/artifacts.R @@ -64,8 +64,8 @@ rhdf5::h5writeDataset(.as_string_values(vec), h5loc = fid, name = col_path) } did <- rhdf5::H5Dopen(fid, col_path) - rhdf5::h5writeAttribute(col_name, h5obj = did, name = "source_name") - rhdf5::h5writeAttribute(source_dtype, h5obj = did, name = "source_dtype") + rhdf5::h5writeAttribute(col_name, h5obj = did, name = "source_name", asScalar = TRUE) + rhdf5::h5writeAttribute(source_dtype, h5obj = did, name = "source_dtype", asScalar = TRUE) rhdf5::H5Dclose(did) } invisible(NULL) @@ -73,10 +73,10 @@ # Write a sparse matrix under a named HDF5 group. # csr = FALSE (default): CSC — indptr over columns (genes), indices are row (cell) indices. -# Input m must be cells × genes (n_obs × n_vars). +# Input m must be cells x genes (n_obs x n_vars). # csr = TRUE: CSR — indptr over rows (cells), indices are column (gene) indices. -# Input m must be genes × cells (n_vars × n_obs) as returned by Seurat GetAssayData. -# Stored via CSC(genes × cells) ≡ CSR(cells × genes); no transpose needed. +# Input m must be genes x cells (n_vars x n_obs) as returned by Seurat GetAssayData. +# Stored via CSC(genes x cells) == CSR(cells x genes); no transpose needed. .write_sparse_group <- function(fid, group, m, n_obs, col_batch, chunk_size, csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT") { if (csr) { @@ -86,11 +86,12 @@ n_vars <- ncol(m) } n_cols <- ncol(m) + rhdf5::h5createGroup(fid, group) gid <- rhdf5::H5Gopen(fid, group) on.exit(rhdf5::H5Gclose(gid), add = TRUE) - rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = gid, name = "n_obs") - rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = gid, name = "n_vars") + rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = gid, name = "n_obs", asScalar = TRUE) + rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = gid, name = "n_vars", asScalar = TRUE) rhdf5::h5createDataset(fid, paste0(group, "/indices"), dims = 0L, maxdims = rhdf5::H5Sunlimited(), chunk = chunk_size, @@ -107,7 +108,7 @@ chunk <- as(m[, start:end, drop = FALSE], "CsparseMatrix") chunk_indices <- as.integer(chunk@i) chunk_data <- if (data_h5type == "H5T_NATIVE_INT32") as.integer(chunk@x) else as.numeric(chunk@x) - chunk_nnz <- length(chunk_indices) + chunk_nnz <- length(chunk_indices) if (chunk_nnz > 0L) { rhdf5::h5set_extent(fid, paste0(group, "/indices"), current_size + chunk_nnz) rhdf5::h5writeDataset(chunk_indices, h5loc = fid, name = paste0(group, "/indices"), @@ -159,7 +160,8 @@ } if (!is.null(feature_df)) { - .write_var_metadata(fid, n_cols = n_vars, feature_df = feature_df, feature_names = feature_names) + .write_var_metadata(fid, n_cols = n_vars, feature_df = feature_df, + feature_names = feature_names) } invisible(out_file) diff --git a/R/cytetype.R b/R/cytetype.R index ca4248a..20768ea 100644 --- a/R/cytetype.R +++ b/R/cytetype.R @@ -186,7 +186,8 @@ PrepareCyteTypeR <- function(obj, group_key = group_key, build_succeeded = build_succeeded, vars_h5_path = vars_h5_path, - obs_duckdb_path = obs_duckdb_path + obs_duckdb_path = obs_duckdb_path, + coordinates_key = coordinates_key ) # Store query obj@misc$query <- prepped_data @@ -308,6 +309,8 @@ CyteTypeR <- function(obj, group_key <- prepped_data$group_key prepped_data$group_key <- NULL + coordinates_key <- prepped_data$coordinates_key %||% "umap" + prepped_data$coordinates_key <- NULL .validate_input_data(prepped_data) From ba5e9808dcd893cc99266cbc5ca006af2560c2fe Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Fri, 6 Mar 2026 12:14:59 +0100 Subject: [PATCH 02/18] Increment version number to 0.4.1 --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 632f32f..127e7ef 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: CyteTypeR Title: CyteType for R -Version: 0.4.0 +Version: 0.4.1 Description: CyteTypeR is the R version of CyteType python package. Authors@R: person("Nygen Analytics AB", , ,"contact@nygen.io", role = c("aut", "cre")) From a281ccce04772ebc4511ef32b5380fbc8471a205 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:38:25 +0100 Subject: [PATCH 03/18] clean query --- R/cytetype.R | 2 -- R/schema.R | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/R/cytetype.R b/R/cytetype.R index 20768ea..239d2ed 100644 --- a/R/cytetype.R +++ b/R/cytetype.R @@ -308,9 +308,7 @@ CyteTypeR <- function(obj, ) group_key <- prepped_data$group_key - prepped_data$group_key <- NULL coordinates_key <- prepped_data$coordinates_key %||% "umap" - prepped_data$coordinates_key <- NULL .validate_input_data(prepped_data) diff --git a/R/schema.R b/R/schema.R index 4eea68b..fdcf8c2 100644 --- a/R/schema.R +++ b/R/schema.R @@ -121,6 +121,7 @@ LLMModelConfig <- function(provider, query_list$input_data$vars_h5_path <- NULL query_list$input_data$obs_duckdb_path <- NULL query_list$input_data$group_key <- NULL + query_list$input_data$coordinates_key <- NULL return(query_list) } From ee9de459ec0f11203db82fdfbebd25f7047aa16b Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:41:02 +0100 Subject: [PATCH 04/18] Update cytetype.R --- R/cytetype.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/cytetype.R b/R/cytetype.R index 239d2ed..5c14511 100644 --- a/R/cytetype.R +++ b/R/cytetype.R @@ -82,7 +82,7 @@ PrepareCyteTypeR <- function(obj, n_top_genes = 50, aggregate_metadata = TRUE, min_percentage = 10, - pcent_batch_size = 2000, + pcent_batch_size = 5000, coordinates_key = "umap", max_cells_per_group = 1000, vars_h5_path = "vars.h5", From 7e721ba713f63ba3b5e0e709ef5a5393eb39e013 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 09:11:48 +0100 Subject: [PATCH 05/18] Update cytetype.R --- R/cytetype.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/cytetype.R b/R/cytetype.R index 5c14511..24ec267 100644 --- a/R/cytetype.R +++ b/R/cytetype.R @@ -19,7 +19,7 @@ #' @param min_percentage Numeric threshold for minimum percentage. #' Default is 10. #' @param pcent_batch_size Integer specifying batch size for expression percentage -#' calculations. Default is 2000. +#' calculations. Default is 5000. #' @param coordinates_key Character string specifying which dimensional reduction #' to use for visualization coordinates (e.g., "umap", "tsne"). Default is "umap". #' @param max_cells_per_group Integer specifying maximum cells per cluster for @@ -167,7 +167,7 @@ PrepareCyteTypeR <- function(obj, log_info("Built vars.h5 successfully.") log_info("Building obs.duckdb (API) from cell metadata (Seurat obj@meta.data)...") - + .save_obs_duckdb(obs_duckdb_path, obj@meta.data, coordinates = coords, coordinates_key = coordinates_key) log_info("Built obs.duckdb successfully.") @@ -365,7 +365,7 @@ CyteTypeR <- function(obj, } }) } - + query_for_json <- .prepare_query_for_json(query_list) if (save_query) { write_json(query_for_json, path = query_filename, auto_unbox = TRUE, pretty = TRUE) From 1a019a05841ec72d3c01fbaf251e439d9870485d Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 09:15:24 +0100 Subject: [PATCH 06/18] Update PrepareCyteTypeR.Rd --- man/PrepareCyteTypeR.Rd | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/man/PrepareCyteTypeR.Rd b/man/PrepareCyteTypeR.Rd index 1c56a7c..5bc935e 100644 --- a/man/PrepareCyteTypeR.Rd +++ b/man/PrepareCyteTypeR.Rd @@ -12,7 +12,7 @@ PrepareCyteTypeR( n_top_genes = 50, aggregate_metadata = TRUE, min_percentage = 10, - pcent_batch_size = 2000, + pcent_batch_size = 5000, coordinates_key = "umap", max_cells_per_group = 1000, vars_h5_path = "vars.h5", @@ -41,7 +41,7 @@ across cells within each cluster. Default is \code{TRUE}.} Default is 10.} \item{pcent_batch_size}{Integer specifying batch size for expression percentage -calculations. Default is 2000.} +calculations. Default is 5000.} \item{coordinates_key}{Character string specifying which dimensional reduction to use for visualization coordinates (e.g., "umap", "tsne"). Default is "umap".} From 1fa4e79c409c7005a9d758999aab224b7af35f0c Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 09:59:10 +0100 Subject: [PATCH 07/18] Update client.R --- R/client.R | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/R/client.R b/R/client.R index b03f80c..8125fb0 100644 --- a/R/client.R +++ b/R/client.R @@ -1,5 +1,8 @@ -# Upload size limits: 100MB and 50GB respectively (vars_h5 uses numeric to avoid integer overflow) -.MAX_UPLOAD_BYTES <- list(obs_duckdb = 100L * 1024L * 1024L, vars_h5 = 50 * 1024 * 1024 * 1024) +# Upload size limits: use numeric to avoid integer overflow) +.MAX_UPLOAD_BYTES <- list( + obs_duckdb = 2 * 1024 * 1024 * 1024, # 2 GB + vars_h5 = 50 * 1024 * 1024 * 1024 # 50 GB +) # Chunked upload retry: delays (sec) after 1st, 2nd, 3rd failure; status codes treated as transient (incl. network/gateway) .CHUNK_UPLOAD_BACKOFF_SECS <- c(1L, 5L, 20L) From 8972a067c18173fd843f962e5c6e403c26fa8d0c Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 13:43:30 +0100 Subject: [PATCH 08/18] Update api.R --- R/api.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/api.R b/R/api.R index 6e9beda..ff06d20 100644 --- a/R/api.R +++ b/R/api.R @@ -1,4 +1,5 @@ # API Response Helper for CyteType +# HTTP primitives, request utilities, and shared constants for CyteType API communication. #' @importFrom httr2 req_auth_bearer_token req_body_json req_headers req_method req_perform req_timeout request resp_body_json resp_body_string resp_status .api_response_helper <- function(job_id, api_url, req_item, auth_token = NULL) { From 9c8ea8b8594df0b64409ea27a15d6a7822d58a71 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 13:44:35 +0100 Subject: [PATCH 09/18] Update api.R --- R/api.R | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/R/api.R b/R/api.R index ff06d20..43a55a1 100644 --- a/R/api.R +++ b/R/api.R @@ -1,6 +1,30 @@ # API Response Helper for CyteType # HTTP primitives, request utilities, and shared constants for CyteType API communication. +# Upload size limits (numeric to avoid integer overflow) +.MAX_UPLOAD_BYTES <- list( + obs_duckdb = 2 * 1024 * 1024 * 1024, # 2 GB + vars_h5 = 50 * 1024 * 1024 * 1024 # 50 GB +) + +# Chunked upload retry: delays (sec) after 1st, 2nd, 3rd failure; status codes treated as transient (incl. network/gateway) +.CHUNK_UPLOAD_BACKOFF_SECS <- c(1L, 5L, 20L) +.CHUNK_UPLOAD_TRANSIENT_STATUSES <- c(500L, 502L, 503L, 504L) + +# URL path builder (avoids file.path backslashes on Windows) +.url_path <- function(...) { + x <- vapply(c(...), function(seg) gsub("^/+|/+$", "", as.character(seg)), character(1)) + paste(x[nzchar(x)], collapse = "/") +} + +.make_req <- function(base_url, path, auth_token) { + req <- request(paste0(base_url, "/", path)) + if (!is.null(auth_token)) { + req <- req_headers(req, Authorization = paste("Bearer", auth_token)) + } + return(req) +} + #' @importFrom httr2 req_auth_bearer_token req_body_json req_headers req_method req_perform req_timeout request resp_body_json resp_body_string resp_status .api_response_helper <- function(job_id, api_url, req_item, auth_token = NULL) { From 0a20305e6dc57c53045a88f0b5afc8f7c20d1178 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:33:25 +0100 Subject: [PATCH 10/18] Update R-CMD-check.yaml --- .github/workflows/R-CMD-check.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/R-CMD-check.yaml b/.github/workflows/R-CMD-check.yaml index a246997..cf89bf2 100644 --- a/.github/workflows/R-CMD-check.yaml +++ b/.github/workflows/R-CMD-check.yaml @@ -1,9 +1,11 @@ # Workflow derived from https://github.com/r-lib/actions/tree/v2/examples # Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help + on: push: branches: [main, master] pull_request: + types: [opened, synchronize, reopened, ready_for_review] name: R-CMD-check.yaml @@ -12,6 +14,7 @@ permissions: read-all jobs: R-CMD-check: runs-on: ${{ matrix.config.os }} + if: github.event_name != 'pull_request' || github.event.pull_request.draft == false name: ${{ matrix.config.os }} (${{ matrix.config.r }}) @@ -27,7 +30,7 @@ jobs: env: GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} R_KEEP_PKG_SOURCE: yes - + steps: - uses: actions/checkout@v4 From 4cc0d3c2c3be124192de3b727107536706028562 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:01:17 +0100 Subject: [PATCH 11/18] Update DESCRIPTION --- DESCRIPTION | 2 ++ 1 file changed, 2 insertions(+) diff --git a/DESCRIPTION b/DESCRIPTION index 127e7ef..ce67b95 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -26,8 +26,10 @@ Imports: tidyr Suggests: duckdb, + future, furrr, knitr, + progressr, rhdf5filters, rmarkdown, testthat (>= 3.0.0) From 19b0f91d7514ba7b75c7166bde1396f3e7f1f325 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:01:31 +0100 Subject: [PATCH 12/18] man --- man/CyteTypeR.Rd | 3 +++ man/PrepareCyteTypeR.Rd | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/man/CyteTypeR.Rd b/man/CyteTypeR.Rd index 0ced50e..9200ee8 100644 --- a/man/CyteTypeR.Rd +++ b/man/CyteTypeR.Rd @@ -19,6 +19,7 @@ CyteTypeR( save_query = TRUE, query_filename = "query.json", upload_timeout_seconds = 3600L, + upload_max_workers = 6L, require_artifacts = TRUE, show_progress = TRUE, override_existing_results = FALSE @@ -53,6 +54,8 @@ CyteTypeR( \item{upload_timeout_seconds}{Integer. Socket read timeout (seconds) for each artifact upload. Default is 3600.} +\item{upload_max_workers}{Integer. Maximum number of parallel upload workers. Default is 6.} + \item{require_artifacts}{Logical. If \code{TRUE}, an error during artifact build or upload stops the run; if \code{FALSE}, failures are skipped and annotation continues without artifacts. Default is \code{TRUE}.} \item{show_progress}{Logical. Whether to show progress (spinner and cluster status). Set \code{FALSE} to disable. Default is \code{TRUE}.} diff --git a/man/PrepareCyteTypeR.Rd b/man/PrepareCyteTypeR.Rd index 5bc935e..de36cbf 100644 --- a/man/PrepareCyteTypeR.Rd +++ b/man/PrepareCyteTypeR.Rd @@ -12,6 +12,7 @@ PrepareCyteTypeR( n_top_genes = 50, aggregate_metadata = TRUE, min_percentage = 10, + max_metadata_categories = 500L, pcent_batch_size = 5000, coordinates_key = "umap", max_cells_per_group = 1000, @@ -40,6 +41,10 @@ across cells within each cluster. Default is \code{TRUE}.} \item{min_percentage}{Numeric threshold for minimum percentage. Default is 10.} +\item{max_metadata_categories}{Integer. Maximum number of unique values a +categorical metadata column may have to be included in aggregation. Columns +exceeding this limit are silently skipped. Default is 500.} + \item{pcent_batch_size}{Integer specifying batch size for expression percentage calculations. Default is 5000.} From c9eda2aa8075ed87a4653029d12ab2b9d8005161 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:01:46 +0100 Subject: [PATCH 13/18] Update api.R --- R/api.R | 110 +++++++++++++++----------------------------------------- 1 file changed, 28 insertions(+), 82 deletions(-) diff --git a/R/api.R b/R/api.R index 43a55a1..d3ea693 100644 --- a/R/api.R +++ b/R/api.R @@ -1,4 +1,3 @@ -# API Response Helper for CyteType # HTTP primitives, request utilities, and shared constants for CyteType API communication. # Upload size limits (numeric to avoid integer overflow) @@ -102,89 +101,36 @@ }) } -# Make Request for Job Results -.make_results_request <- function(job_id, api_url, auth_token = NULL) { - - # Helper for consistent responses - make_response <- function(status, result = NULL, message, raw = NULL) { - list(status = status, result = result, message = message, raw_response = raw) +# PUT raw bytes to a presigned URL with retry, ETag validation, and proper error classification. +.put_to_presigned_url <- function(presigned_url, chunk_data, timeout_seconds) { + resp <- request(presigned_url) |> + req_method("PUT") |> + httr2::req_body_raw(chunk_data, type = "application/octet-stream") |> + req_timeout(timeout_seconds) |> + httr2::req_error(is_error = function(resp) FALSE) |> + httr2::req_retry( + max_tries = length(.CHUNK_UPLOAD_BACKOFF_SECS) + 1L, + retry_on_failure = TRUE, + is_transient = function(resp) resp_status(resp) %in% .CHUNK_UPLOAD_TRANSIENT_STATUSES, + backoff = function(tries) .CHUNK_UPLOAD_BACKOFF_SECS[min(tries, length(.CHUNK_UPLOAD_BACKOFF_SECS))] + ) |> + req_perform() + + status <- resp_status(resp) + if (status >= 400L) { + stop(cytetype_api_error( + message = paste0("Presigned upload rejected with HTTP ", status), + call = "api" + )) } - tryCatch({ - # Get job status - status_resp <- .api_response_helper(job_id, api_url, 'status', auth_token) - - # Handle 404 immediately - if (status_resp$status_code == 404) { - return(make_response("not_found", message = "Job not found")) - } - - job_status <- status_resp$data$jobStatus - status_data <- status_resp$data - - # Process based on job status - if (job_status == "completed") { - # Try to get results - results_resp <- tryCatch( - .api_response_helper(job_id, api_url, 'results', auth_token), - error = function(e) { - make_response( - "failed", - message = paste("Job completed but results unavailable:", e$message), - raw = status_data - ) - } - ) - - if (!is.null(results_resp$status) && results_resp$status == "failed") { - return(results_resp) - } - - if (results_resp$status_code == 404) { - return(make_response( - "failed", - message = "Job completed but results are unavailable", - raw = status_data - )) - } - - return(make_response( - "completed", - result = results_resp$data, - message = "Job completed successfully", - raw = status_data - )) - } - - if (job_status == "failed") { - return(make_response( - "failed", - message = "Job failed", - raw = status_data - )) - } - - if (job_status %in% c("processing", "pending")) { - return(make_response( - job_status, - message = paste("Job is", job_status), - raw = status_data - )) - } - - # Unknown status - return(make_response( - "unknown", - message = paste("Unknown job status:", job_status), - raw = status_data + etag <- httr2::resp_header(resp, "ETag") + if (is.null(etag) || !nzchar(etag)) { + stop(cytetype_api_error( + message = "Presigned PUT succeeded but response is missing ETag header", + call = "network" )) + } - }, error = function(e) { - # Handle any unexpected errors - return(make_response( - "error", - message = paste("Error checking job status:", e$message) - )) - }) + etag } - From 1f87269ee2a967d5d88eca38045143aa5d4f5925 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:02:03 +0100 Subject: [PATCH 14/18] Update artifacts.R --- R/artifacts.R | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/R/artifacts.R b/R/artifacts.R index 9130de0..1d2c18c 100644 --- a/R/artifacts.R +++ b/R/artifacts.R @@ -78,7 +78,8 @@ # Input m must be genes x cells (n_vars x n_obs) as returned by Seurat GetAssayData. # Stored via CSC(genes x cells) == CSR(cells x genes); no transpose needed. .write_sparse_group <- function(fid, group, m, n_obs, col_batch, chunk_size, - csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT") { + csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT", + pb_id = NULL) { if (csr) { m <- as(m, "CsparseMatrix") n_vars <- nrow(m) @@ -120,6 +121,7 @@ } new_indptr <- as.numeric(chunk@p[-1L]) + indptr[length(indptr)] indptr <- c(indptr, new_indptr) + if (!is.null(pb_id)) cli::cli_progress_update(id = pb_id) } rhdf5::h5createDataset(fid, paste0(group, "/indptr"), dims = length(indptr), @@ -143,6 +145,21 @@ } chunk_size <- max(1L, min(n_obs * 10L, min_chunk_size)) + raw_col_batch <- if (!is.null(raw_mat)) { + max(1L, as.integer(100000000 / max(nrow(raw_mat), 1))) + } else NULL + + vars_n_batches <- length(seq(1L, n_vars, by = col_batch)) + raw_n_batches <- if (!is.null(raw_mat)) length(seq(1L, ncol(raw_mat), by = raw_col_batch)) else 0L + total_batches <- vars_n_batches + raw_n_batches + + pb_label <- if (raw_n_batches > 0L) "Writing vars.h5 (normalized + raw)" else "Writing vars.h5" + pb_id <- cli::cli_progress_bar( + format = paste0(pb_label, " {cli::pb_bar} {cli::pb_current}/{cli::pb_total} batches ({cli::pb_rate})"), + total = total_batches, + clear = FALSE + ) + if (file.exists(out_file) && !file.remove(out_file)) { stop("Could not remove existing file: ", out_file) } @@ -151,12 +168,11 @@ fid <- rhdf5::H5Fopen(out_file, flags = "H5F_ACC_RDWR") on.exit(rhdf5::H5Fclose(fid), add = TRUE) - .write_sparse_group(fid, "vars", mat, n_obs, col_batch, chunk_size) + .write_sparse_group(fid, "vars", mat, n_obs, col_batch, chunk_size, pb_id = pb_id) if (!is.null(raw_mat)) { - raw_col_batch <- max(1L, as.integer(100000000 / max(nrow(raw_mat), 1))) .write_sparse_group(fid, "raw", raw_mat, n_obs, raw_col_batch, chunk_size, - csr = TRUE, data_h5type = "H5T_NATIVE_INT32") + csr = TRUE, data_h5type = "H5T_NATIVE_INT32", pb_id = pb_id) } if (!is.null(feature_df)) { @@ -189,6 +205,8 @@ } } + cli::cli_progress_step("Writing obs.duckdb") + if (file.exists(out_file)) file.remove(out_file) config <- list(threads = as.character(threads), memory_limit = memory_limit, temp_directory = temp_directory) con <- duckdb::dbConnect(duckdb::duckdb(), out_file, config = config) From 2e264c1419f728d344c3cf3f680be8b157337a60 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:02:22 +0100 Subject: [PATCH 15/18] Update client.R --- R/client.R | 210 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 172 insertions(+), 38 deletions(-) diff --git a/R/client.R b/R/client.R index 8125fb0..73f7d81 100644 --- a/R/client.R +++ b/R/client.R @@ -1,26 +1,4 @@ -# Upload size limits: use numeric to avoid integer overflow) -.MAX_UPLOAD_BYTES <- list( - obs_duckdb = 2 * 1024 * 1024 * 1024, # 2 GB - vars_h5 = 50 * 1024 * 1024 * 1024 # 50 GB -) - -# Chunked upload retry: delays (sec) after 1st, 2nd, 3rd failure; status codes treated as transient (incl. network/gateway) -.CHUNK_UPLOAD_BACKOFF_SECS <- c(1L, 5L, 20L) -.CHUNK_UPLOAD_TRANSIENT_STATUSES <- c(500L, 502L, 503L, 504L) - -# URL path builder (avoids file.path backslashes on Windows) -.url_path <- function(...) { - x <- vapply(c(...), function(seg) gsub("^/+|/+$", "", as.character(seg)), character(1)) - paste(x[nzchar(x)], collapse = "/") -} - -.make_req <- function(base_url, path, auth_token) { - req <- request(paste0(base_url, "/", path)) - if (!is.null(auth_token)) { - req <- req_headers(req, Authorization = paste("Bearer", auth_token)) - } - return(req) -} +# Multi-step workflow orchestration: uploads, job submission, polling, and result retrieval. # Chunked upload flow (initiate -> PUT chunks -> complete). Uses scalar timeout; httr2 req_timeout takes seconds. @@ -29,8 +7,8 @@ file_kind, file_path, timeout_seconds = 3600L, - max_workers = 4L) { - api_url <- paste0(api_url, "/upload") + max_workers = 6L, + show_progress = TRUE) { if (!file.exists(file_path)) stop("Upload file not found: ", file_path) @@ -42,12 +20,13 @@ } connection_timeout <- 72L + api_url <- paste0(api_url, "/upload") - # Step 1 – Initiate (empty POST; explicit empty body for compatibility) + # Step 1 – Initiate init_resp <- tryCatch( .make_req(api_url, paste0(file_kind, "/initiate"), auth_token) |> req_method("POST") |> - httr2::req_body_raw(raw(0), type = "application/json") |> + httr2::req_body_json(list("file_size_bytes" = size), type = "application/json") |> req_timeout(connection_timeout) |> req_perform() |> resp_body_json(), @@ -66,12 +45,22 @@ n_chunks <- if (size > 0) as.integer(ceiling(size / chunk_size)) else 0L + presigned_urls <- init_resp$presigned_urls %||% init_resp$presignedUrls %||% list() + r2_upload_id <- init_resp$r2_upload_id %||% init_resp$r2UploadId %||% NULL + use_r2 <- length(presigned_urls) > 0L && !is.null(r2_upload_id) + + log_info("Upload {file_kind}: {n_chunks} chunks, use_r2={use_r2}, presigned_urls={length(presigned_urls)}") + + if (use_r2 && length(presigned_urls) < n_chunks) { + stop("Server returned ", length(presigned_urls), " presigned URLs but need at least ", n_chunks, " (one per chunk).") + } + # Step 2 – Upload chunks (future for cross-platform parallel, req_retry for resilience) if (n_chunks > 0) { effective_workers <- min(max_workers, n_chunks) chunk_idxs <- seq_len(n_chunks) - 1L - .upload_one_chunk <- function(chunk_idx) { + .upload_chunk_server <- function(chunk_idx) { tryCatch({ con <- file(file_path, "rb") on.exit(close(con), add = TRUE) @@ -97,22 +86,84 @@ }) } - if (effective_workers > 1L && requireNamespace("furrr", quietly = TRUE)) { - chunk_results <- furrr::future_map(chunk_idxs, .upload_one_chunk) + .upload_chunk_r2 <- function(chunk_idx) { + tryCatch({ + con <- file(file_path, "rb") + on.exit(close(con), add = TRUE) + offset <- chunk_idx * chunk_size + read_size <- min(chunk_size, size - offset) + seek(con, offset) + chunk_data <- readBin(con, what = "raw", n = read_size) + presigned_url <- presigned_urls[[chunk_idx + 1L]] + etag <- .put_to_presigned_url(presigned_url, chunk_data, timeout_seconds) + list(ok = TRUE, etag = etag, part_number = chunk_idx + 1L) + }, error = function(e) { + .stop_if_rate_limited(e) + list(ok = FALSE, chunk_idx = chunk_idx, message = conditionMessage(e)) + }) + } + + upload_fn <- if (use_r2) .upload_chunk_r2 else .upload_chunk_server + has_furrr <- requireNamespace("furrr", quietly = TRUE) + has_progressr <- requireNamespace("progressr", quietly = TRUE) + use_parallel <- effective_workers > 1L && has_furrr && (!show_progress || has_progressr) + + if (use_parallel) { + oplan <- future::plan(future::multisession, workers = effective_workers) + on.exit(future::plan(oplan), add = TRUE) + + if (show_progress) { + pb_fmt <- paste0( + "Uploading ", file_kind, + " {cli::pb_bar} {cli::pb_current}/{cli::pb_total} chunks ({cli::pb_rate})" + ) + progressr::with_progress({ + p <- progressr::progressor(steps = n_chunks) + chunk_results <- furrr::future_map(chunk_idxs, function(idx) { + result <- upload_fn(idx) + p() + result + }) + }, handlers = progressr::handler_cli(format = pb_fmt)) + } else { + chunk_results <- furrr::future_map(chunk_idxs, upload_fn) + } } else { - chunk_results <- purrr::map(chunk_idxs, .upload_one_chunk) + if (show_progress) { + pb_fmt <- paste0( + "Uploading ", file_kind, + " {cli::pb_bar} {cli::pb_current}/{cli::pb_total} chunks ({cli::pb_rate})" + ) + pb_id <- cli::cli_progress_bar(format = pb_fmt, total = n_chunks, clear = FALSE, .envir = environment()) + } + chunk_results <- purrr::map(chunk_idxs, function(idx) { + result <- upload_fn(idx) + if (show_progress) cli::cli_progress_update(id = pb_id) + result + }) } + failed <- which(!vapply(chunk_results, function(r) is.list(r) && isTRUE(r$ok), logical(1))) if (length(failed) > 0L) { r <- chunk_results[[failed[1L]]] stop("Upload chunk ", r$chunk_idx, " failed: ", r$message) } } - # Step 3 – Complete (empty POST; explicit empty body for compatibility) + + # Step 3 – Complete + complete_body <- if (use_r2) { + parts <- lapply(chunk_results, function(r) { + list(ETag = r$etag, PartNumber = r$part_number) + }) + list(parts = parts) + } else { + list() + } + complete_resp <- tryCatch( .make_req(api_url, paste0(upload_id, "/complete"), auth_token) |> req_method("POST") |> - httr2::req_body_raw(raw(0), type = "application/json") |> + httr2::req_body_json(complete_body, type = "application/json") |> req_timeout(connection_timeout) |> req_perform() |> resp_body_json(), @@ -123,12 +174,16 @@ } -.upload_obs_duckdb <- function(api_url, auth_token, file_path, timeout_seconds = 3600L, max_workers = 4L) { - .upload_file_chunked(api_url, auth_token, "obs_duckdb", file_path, timeout_seconds, max_workers) +.upload_obs_duckdb <- function(api_url, auth_token, file_path, timeout_seconds = 3600L, + max_workers = 6L, show_progress = TRUE) { + .upload_file_chunked(api_url, auth_token, "obs_duckdb", file_path, timeout_seconds, + max_workers, show_progress) } -.upload_vars_h5 <- function(api_url, auth_token, file_path, timeout_seconds = 3600L, max_workers = 4L) { - .upload_file_chunked(api_url, auth_token, "vars_h5", file_path, timeout_seconds, max_workers) +.upload_vars_h5 <- function(api_url, auth_token, file_path, timeout_seconds = 3600L, + max_workers = 6L, show_progress = TRUE) { + .upload_file_chunked(api_url, auth_token, "vars_h5", file_path, timeout_seconds, + max_workers, show_progress) } # Submit a new job @@ -182,6 +237,86 @@ return(NA_character_) }) } + +# Multi-step job results request (status check + results fetch) +.make_results_request <- function(job_id, api_url, auth_token = NULL) { + + make_response <- function(status, result = NULL, message, raw = NULL) { + list(status = status, result = result, message = message, raw_response = raw) + } + + tryCatch({ + status_resp <- .api_response_helper(job_id, api_url, 'status', auth_token) + + if (status_resp$status_code == 404) { + return(make_response("not_found", message = "Job not found")) + } + + job_status <- status_resp$data$jobStatus + status_data <- status_resp$data + + if (job_status == "completed") { + results_resp <- tryCatch( + .api_response_helper(job_id, api_url, 'results', auth_token), + error = function(e) { + make_response( + "failed", + message = paste("Job completed but results unavailable:", e$message), + raw = status_data + ) + } + ) + + if (!is.null(results_resp$status) && results_resp$status == "failed") { + return(results_resp) + } + + if (results_resp$status_code == 404) { + return(make_response( + "failed", + message = "Job completed but results are unavailable", + raw = status_data + )) + } + + return(make_response( + "completed", + result = results_resp$data, + message = "Job completed successfully", + raw = status_data + )) + } + + if (job_status == "failed") { + return(make_response( + "failed", + message = "Job failed", + raw = status_data + )) + } + + if (job_status %in% c("processing", "pending")) { + return(make_response( + job_status, + message = paste("Job is", job_status), + raw = status_data + )) + } + + return(make_response( + "unknown", + message = paste("Unknown job status:", job_status), + raw = status_data + )) + + }, error = function(e) { + return(make_response( + "error", + message = paste("Error checking job status:", e$message) + )) + }) +} + # Polling for results .poll_for_results <- function(job_id, api_url, poll_interval = NULL, timeout = NULL, auth_token = NULL, @@ -356,4 +491,3 @@ } } - From 24264453d9f9b2d9e51cd70df8f7379eed8bebf8 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:02:33 +0100 Subject: [PATCH 16/18] Update cytetype.R --- R/cytetype.R | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/R/cytetype.R b/R/cytetype.R index 24ec267..bcad406 100644 --- a/R/cytetype.R +++ b/R/cytetype.R @@ -18,6 +18,9 @@ #' across cells within each cluster. Default is `TRUE`. #' @param min_percentage Numeric threshold for minimum percentage. #' Default is 10. +#' @param max_metadata_categories Integer. Maximum number of unique values a +#' categorical metadata column may have to be included in aggregation. Columns +#' exceeding this limit are silently skipped. Default is 500. #' @param pcent_batch_size Integer specifying batch size for expression percentage #' calculations. Default is 5000. #' @param coordinates_key Character string specifying which dimensional reduction @@ -82,6 +85,7 @@ PrepareCyteTypeR <- function(obj, n_top_genes = 50, aggregate_metadata = TRUE, min_percentage = 10, + max_metadata_categories = 500L, pcent_batch_size = 5000, coordinates_key = "umap", max_cells_per_group = 1000, @@ -96,14 +100,15 @@ PrepareCyteTypeR <- function(obj, .validate_marker_table(marker_table,sorted_clusters) if (aggregate_metadata){ - print("Aggregating metadata...") - group_metadata <- .aggregate_metadata(obj, group_key, min_percentage = min_percentage) + log_info("Aggregating metadata...") + group_metadata <- .aggregate_metadata(obj, group_key, min_percentage = min_percentage, + max_metadata_categories = max_metadata_categories) # Map cluster ids to use those natural numbers names(group_metadata) <- cluster_map[names(group_metadata)] } else{group_metadata <- list() } - print(paste("Preparing marker genes with top",n_top_genes,"genes...")) + log_info("Preparing marker genes with top {n_top_genes} genes...") marker_genes <- marker_table %>% group_by(cluster) %>% @@ -121,14 +126,14 @@ PrepareCyteTypeR <- function(obj, visualization_data <- NULL tryCatch({ coords <- Seurat::Embeddings(obj, reduction = coordinates_key) - print("Preparing visualisation data...") + log_info("Preparing visualization data...") visualization_data <- .sample_visualization_data(obj, group_key, coordinates_key, cluster_map, max_cells_per_group) }, error = function(e) { log_warn(paste("Could not extract coordinates for reduction '", coordinates_key, "'. Continuing without visualization data. Error:", conditionMessage(e))) }) - print("Calculating expression percentages...") + log_info("Calculating expression percentages...") expression_percentages <- .calculate_pcent(obj, group_key, cluster_map, pcent_batch_size) # Prep cluster_map to named list for each "group/cluster" @@ -192,7 +197,7 @@ PrepareCyteTypeR <- function(obj, # Store query obj@misc$query <- prepped_data - print("Done!") + log_info("Preparation complete.") return(prepped_data) } #' @@ -217,6 +222,7 @@ PrepareCyteTypeR <- function(obj, #' @param save_query Logical. Whether to save the request payload to a JSON file. Default is `TRUE`. #' @param query_filename Character. Filename for the saved query when `save_query` is `TRUE`. Default is `"query.json"`. #' @param upload_timeout_seconds Integer. Socket read timeout (seconds) for each artifact upload. Default is 3600. +#' @param upload_max_workers Integer. Maximum number of parallel upload workers. Default is 6. #' @param require_artifacts Logical. If `TRUE`, an error during artifact build or upload stops the run; if `FALSE`, failures are skipped and annotation continues without artifacts. Default is `TRUE`. #' @param show_progress Logical. Whether to show progress (spinner and cluster status). Set `FALSE` to disable. Default is `TRUE`. #' @param override_existing_results Logical. If `TRUE`, allow overwriting existing results with the same `results_prefix`. If `FALSE` and results exist, an error is raised. Default is `FALSE`. @@ -274,6 +280,7 @@ CyteTypeR <- function(obj, save_query = TRUE, query_filename = "query.json", upload_timeout_seconds = 3600L, + upload_max_workers = 6L, require_artifacts = TRUE, show_progress = TRUE, override_existing_results = FALSE @@ -341,11 +348,13 @@ CyteTypeR <- function(obj, obs_duckdb_path <- prepped_data$obs_duckdb_path log_info("Uploading obs.duckdb (cell metadata)...") - cell_metadata_upload <- .upload_obs_duckdb(api_url, auth_token, obs_duckdb_path, upload_timeout_seconds) + cell_metadata_upload <- .upload_obs_duckdb(api_url, auth_token, obs_duckdb_path, + upload_timeout_seconds, max_workers = upload_max_workers, show_progress = show_progress) log_info("Uploaded obs.duckdb successfully.") log_info("Uploading vars.h5 (feature expression)...") - feature_expression_upload <- .upload_vars_h5(api_url, auth_token, vars_h5_path, upload_timeout_seconds) + feature_expression_upload <- .upload_vars_h5(api_url, auth_token, vars_h5_path, + upload_timeout_seconds, max_workers = upload_max_workers, show_progress = show_progress) log_info("Uploaded vars.h5 successfully.") query_list$uploaded_files <- list( From 4df9fe584fae608e9dd5582da049582a9980bf52 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:03:04 +0100 Subject: [PATCH 17/18] Update seurat_helpers.R --- R/seurat_helpers.R | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/R/seurat_helpers.R b/R/seurat_helpers.R index 79036fa..357e01d 100644 --- a/R/seurat_helpers.R +++ b/R/seurat_helpers.R @@ -93,7 +93,8 @@ .aggregate_metadata <- function( seurat_obj, group_key, - min_percentage = 10) + min_percentage = 10, + max_metadata_categories = 128L) { metadata <- droplevels(seurat_obj@meta.data) # Get unique groups and initialize result structure @@ -109,11 +110,12 @@ for (column in colnames(metadata)){ if (column == group_key){ - next # skip group_key group in metadata + next } if (inherits(metadata[[column]], "factor") || is.character(metadata[[column]])){ + n_unique <- length(unique(metadata[[column]])) + if (n_unique > max_metadata_categories) next - # Create "crosstab": rows = group values in a metadata cat, columns = groups in group_key crosstab <- table(metadata[[column]], metadata[[group_key]], useNA = "no") # Percentages in each column(metadata group with prop.table margin=2) From cab3949fcfba4d9f8ddebccb1993ff84375fd6c2 Mon Sep 17 00:00:00 2001 From: Yi Su <90744702+suu-yi@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:20:15 +0100 Subject: [PATCH 18/18] test updates --- tests/testthat/test-api.R | 124 ++++++++++++++++++++++ tests/testthat/test-client.R | 199 ++++++++++++++++++++++++++++++++--- 2 files changed, 311 insertions(+), 12 deletions(-) create mode 100644 tests/testthat/test-api.R diff --git a/tests/testthat/test-api.R b/tests/testthat/test-api.R new file mode 100644 index 0000000..694dc59 --- /dev/null +++ b/tests/testthat/test-api.R @@ -0,0 +1,124 @@ +# --- Constants --- + +test_that(".MAX_UPLOAD_BYTES has expected limits", { + lim <- CyteTypeR:::.MAX_UPLOAD_BYTES + expect_equal(lim$obs_duckdb, 2 * 1024 * 1024 * 1024) + expect_equal(lim$vars_h5, 50 * 1024 * 1024 * 1024) + expect_true(is.numeric(lim$obs_duckdb)) + expect_true(is.numeric(lim$vars_h5)) +}) + +test_that("chunked upload retry constants are as expected", { + expect_identical(CyteTypeR:::.CHUNK_UPLOAD_BACKOFF_SECS, c(1L, 5L, 20L)) + expect_identical(CyteTypeR:::.CHUNK_UPLOAD_TRANSIENT_STATUSES, c(500L, 502L, 503L, 504L)) +}) + +# --- .url_path --- + +test_that(".url_path joins segments and strips slashes", { + expect_identical( + CyteTypeR:::.url_path("https://api.example.com", "upload", "123"), + "https://api.example.com/upload/123" + ) +}) + +test_that(".url_path strips leading and trailing slashes from each segment", { + expect_identical( + CyteTypeR:::.url_path("https://api.example.com/", "/upload/", "/123/"), + "https://api.example.com/upload/123" + ) +}) + +test_that(".url_path drops empty segments", { + expect_identical( + CyteTypeR:::.url_path("https://api.example.com", "", "upload"), + "https://api.example.com/upload" + ) +}) + +test_that(".url_path handles single segment", { + expect_identical( + CyteTypeR:::.url_path("https://api.example.com"), + "https://api.example.com" + ) +}) + +# --- .make_req --- + +test_that(".make_req builds URL from base and path", { + req <- CyteTypeR:::.make_req("https://api.example.com", "v1/jobs", NULL) + expect_identical(req$url, "https://api.example.com/v1/jobs") +}) + +test_that(".make_req adds Authorization header when token provided", { + req <- CyteTypeR:::.make_req("https://api.example.com", "v1/jobs", "my-token") + expect_true("Authorization" %in% names(req$headers)) +}) + +test_that(".make_req omits Authorization header when token is NULL", { + req <- CyteTypeR:::.make_req("https://api.example.com", "v1/jobs", NULL) + expect_false("Authorization" %in% names(req$headers)) +}) + +# --- .put_to_presigned_url --- + +test_that(".put_to_presigned_url returns ETag on successful PUT", { + httr2::local_mocked_responses(function(req) { + httr2::response( + status_code = 200L, + headers = list("ETag" = "\"abc123\""), + body = charToRaw("") + ) + }) + etag <- CyteTypeR:::.put_to_presigned_url( + "https://r2.example.com/bucket/chunk/0?sig=xyz", + charToRaw("chunk-data"), + 60L + ) + expect_identical(etag, "\"abc123\"") +}) + +test_that(".put_to_presigned_url raises api error on HTTP 4xx", { + httr2::local_mocked_responses(function(req) { + httr2::response(status_code = 403L, body = charToRaw("Forbidden")) + }) + expect_error( + CyteTypeR:::.put_to_presigned_url( + "https://r2.example.com/bucket/chunk/0?sig=xyz", + charToRaw("chunk-data"), + 60L + ), + "Presigned upload rejected with HTTP 403", + class = "cytetype_api_error" + ) +}) + +test_that(".put_to_presigned_url raises network error when ETag header is missing", { + httr2::local_mocked_responses(function(req) { + httr2::response(status_code = 200L, body = charToRaw("")) + }) + expect_error( + CyteTypeR:::.put_to_presigned_url( + "https://r2.example.com/bucket/chunk/0?sig=xyz", + charToRaw("chunk-data"), + 60L + ), + "missing ETag header", + class = "cytetype_api_error" + ) +}) + +test_that(".put_to_presigned_url raises api error on HTTP 400", { + httr2::local_mocked_responses(function(req) { + httr2::response(status_code = 400L, body = charToRaw("Bad Request")) + }) + expect_error( + CyteTypeR:::.put_to_presigned_url( + "https://r2.example.com/bucket/chunk/0?sig=xyz", + charToRaw("chunk-data"), + 60L + ), + "Presigned upload rejected with HTTP 400", + class = "cytetype_api_error" + ) +}) diff --git a/tests/testthat/test-client.R b/tests/testthat/test-client.R index b828488..f9bf66d 100644 --- a/tests/testthat/test-client.R +++ b/tests/testthat/test-client.R @@ -1,15 +1,3 @@ -test_that(".MAX_UPLOAD_BYTES has expected limits", { - lim <- CyteTypeR:::.MAX_UPLOAD_BYTES - expect_identical(lim$obs_duckdb, 100L * 1024L * 1024L) - expect_equal(lim$vars_h5, 50 * 1024 * 1024 * 1024) - expect_true(is.numeric(lim$vars_h5), info = "vars_h5 must be numeric to avoid integer overflow") -}) - -test_that("chunked upload retry constants are as expected", { - expect_identical(CyteTypeR:::.CHUNK_UPLOAD_BACKOFF_SECS, c(1L, 5L, 20L)) - expect_identical(CyteTypeR:::.CHUNK_UPLOAD_TRANSIENT_STATUSES, c(500L, 502L, 503L, 504L)) -}) - test_that(".upload_file_chunked errors when file does not exist", { expect_error( CyteTypeR:::.upload_file_chunked("https://api.example.com", NULL, "obs_duckdb", "/nonexistent/path.duckdb", 60L), @@ -27,6 +15,193 @@ test_that(".upload_file_chunked errors for unknown file_kind (no upload limit)", ) }) +test_that(".upload_file_chunked rejects when presigned URL count < chunk count", { + f <- tempfile(fileext = ".h5") + on.exit(unlink(f), add = TRUE) + writeBin(raw(1024), f) + + httr2::local_mocked_responses(function(req) { + httr2::response( + status_code = 200L, + headers = list("Content-Type" = "application/json"), + body = charToRaw(jsonlite::toJSON(list( + upload_id = "u1", + chunk_size_bytes = 512L, + r2_upload_id = "r2-abc", + presigned_urls = list("https://r2.example.com/chunk/0") + ), auto_unbox = TRUE)) + ) + }) + + expect_error( + CyteTypeR:::.upload_file_chunked( + "https://api.example.com", NULL, "vars_h5", f, 60L + ), + "presigned URLs but need at least" + ) +}) + +# --- Upload flow tests --- + +.capture_body <- function(req) { + tryCatch({ + d <- req$body$data + if (is.null(d)) return(NULL) + if (is.raw(d)) return(rawToChar(d)) + if (is.character(d)) return(d[[1L]]) + as.character(jsonlite::toJSON(d, auto_unbox = TRUE)) + }, error = function(e) NULL) +} + +test_that("server-proxy upload: initiate -> chunk PUTs -> complete with empty body", { + f <- tempfile(fileext = ".duckdb") + on.exit(unlink(f), add = TRUE) + writeBin(raw(1024), f) + + request_log <- list() + + httr2::local_mocked_responses(function(req) { + url <- req$url + request_log[[length(request_log) + 1L]] <<- list(url = url, method = req$method, body = .capture_body(req)) + + if (grepl("/initiate$", url)) { + httr2::response( + status_code = 200L, + headers = list("Content-Type" = "application/json"), + body = charToRaw(jsonlite::toJSON(list( + upload_id = "srv-u1", + chunk_size_bytes = 512L + ), auto_unbox = TRUE)) + ) + } else if (grepl("/chunk/", url)) { + httr2::response(status_code = 200L, body = charToRaw("")) + } else if (grepl("/complete$", url)) { + httr2::response( + status_code = 200L, + headers = list("Content-Type" = "application/json"), + body = charToRaw(jsonlite::toJSON(list(upload_id = "srv-u1"), auto_unbox = TRUE)) + ) + } else { + httr2::response(status_code = 404L, body = charToRaw("Not found")) + } + }) + + result <- CyteTypeR:::.upload_file_chunked( + "https://api.example.com", "tok", "obs_duckdb", f, 60L, + max_workers = 1L, show_progress = FALSE + ) + + expect_equal(result$upload_id, "srv-u1") + + urls <- vapply(request_log, `[[`, character(1), "url") + expect_true(any(grepl("/obs_duckdb/initiate", urls))) + expect_equal(sum(grepl("/chunk/", urls)), 2L) + expect_true(any(grepl("/complete", urls))) + + init_body <- jsonlite::fromJSON(request_log[[which(grepl("/initiate", urls))]]$body, simplifyVector = FALSE) + expect_equal(init_body$file_size_bytes, 1024) + + complete_body <- jsonlite::fromJSON(request_log[[which(grepl("/complete", urls))]]$body, simplifyVector = FALSE) + expect_null(complete_body$r2_upload_id) +}) + +test_that("R2 upload: presigned PUTs -> complete with r2_upload_id and parts", { + f <- tempfile(fileext = ".duckdb") + on.exit(unlink(f), add = TRUE) + writeBin(raw(1024), f) + + request_log <- list() + + httr2::local_mocked_responses(function(req) { + url <- req$url + request_log[[length(request_log) + 1L]] <<- list(url = url, method = req$method, body = .capture_body(req)) + + if (grepl("/initiate$", url)) { + httr2::response( + status_code = 200L, + headers = list("Content-Type" = "application/json"), + body = charToRaw(jsonlite::toJSON(list( + upload_id = "r2-u1", + chunk_size_bytes = 512L, + r2_upload_id = "r2-mp-xyz", + presigned_urls = list( + "https://r2.example.com/p/0?sig=a", + "https://r2.example.com/p/1?sig=b" + ) + ), auto_unbox = TRUE)) + ) + } else if (grepl("^https://r2\\.example\\.com/", url)) { + etag <- if (grepl("p/0", url)) "\"etag-chunk0\"" else "\"etag-chunk1\"" + httr2::response( + status_code = 200L, + headers = list("ETag" = etag), + body = charToRaw("") + ) + } else if (grepl("/complete$", url)) { + httr2::response( + status_code = 200L, + headers = list("Content-Type" = "application/json"), + body = charToRaw(jsonlite::toJSON(list(upload_id = "r2-u1"), auto_unbox = TRUE)) + ) + } else { + httr2::response(status_code = 404L, body = charToRaw("Not found")) + } + }) + + result <- CyteTypeR:::.upload_file_chunked( + "https://api.example.com", "tok", "obs_duckdb", f, 60L, + max_workers = 1L, show_progress = FALSE + ) + + expect_equal(result$upload_id, "r2-u1") + + urls <- vapply(request_log, `[[`, character(1), "url") + expect_equal(sum(grepl("r2\\.example\\.com", urls)), 2L) + expect_equal(sum(grepl("/chunk/", urls)), 0L) + + complete_body <- jsonlite::fromJSON(request_log[[which(grepl("/complete", urls))]]$body, simplifyVector = FALSE) + expect_null(complete_body$r2_upload_id) + expect_length(complete_body$parts, 2L) + expect_equal(complete_body$parts[[1]]$ETag, "\"etag-chunk0\"") + expect_equal(complete_body$parts[[1]]$PartNumber, 1) + expect_equal(complete_body$parts[[2]]$ETag, "\"etag-chunk1\"") + expect_equal(complete_body$parts[[2]]$PartNumber, 2) +}) + +test_that("upload stops with error when a chunk fails", { + f <- tempfile(fileext = ".duckdb") + on.exit(unlink(f), add = TRUE) + writeBin(raw(1024), f) + + httr2::local_mocked_responses(function(req) { + url <- req$url + if (grepl("/initiate$", url)) { + httr2::response( + status_code = 200L, + headers = list("Content-Type" = "application/json"), + body = charToRaw(jsonlite::toJSON(list( + upload_id = "fail-u1", + chunk_size_bytes = 512L + ), auto_unbox = TRUE)) + ) + } else if (grepl("/chunk/0", url)) { + httr2::response(status_code = 200L, body = charToRaw("")) + } else if (grepl("/chunk/1", url)) { + httr2::response(status_code = 400L, body = charToRaw("Bad Request")) + } else { + httr2::response(status_code = 200L, body = charToRaw("")) + } + }) + + expect_error( + CyteTypeR:::.upload_file_chunked( + "https://api.example.com", NULL, "obs_duckdb", f, 60L, + max_workers = 1L, show_progress = FALSE + ), + "Upload chunk.*failed" + ) +}) + # --- .make_results_request tests --- test_that(".make_results_request returns 'failed' when results endpoint errors", {