Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion flowbio/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
"""
from flowbio.v2.auth import TokenCredentials, UsernamePasswordCredentials
from flowbio.v2.client import Client, ClientConfig
from flowbio.v2.samples import MetadataAttribute, Organism, Project, Sample, SampleType
from flowbio.v2.exceptions import AnnotationValidationError
from flowbio.v2.samples import MetadataAttribute, MultiplexedUpload, Organism, Project, Sample, SampleType

__all__ = [
"AnnotationValidationError",
"Client",
"ClientConfig",
"MetadataAttribute",
"MultiplexedUpload",
"Organism",
"Project",
"Sample",
Expand Down
9 changes: 5 additions & 4 deletions flowbio/v2/_transport.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from http import HTTPStatus
from importlib.metadata import PackageNotFoundError, version

import httpx
Expand Down Expand Up @@ -37,17 +38,17 @@ def __init__(self, base_url: str, connection_retries: int = 3) -> None:
)

_STATUS_TO_EXCEPTION: dict[int, type[FlowApiError]] = {
400: BadRequestError,
401: AuthenticationError,
404: NotFoundError,
HTTPStatus.BAD_REQUEST: BadRequestError,
HTTPStatus.UNAUTHORIZED: AuthenticationError,
HTTPStatus.NOT_FOUND: NotFoundError,
}

def _raise_for_error(self, response: httpx.Response) -> None:
if response.is_success:
return

body = response.json()
message = body.get("error", "Unknown error")
message = body.get("error", body)
exception_class = self._STATUS_TO_EXCEPTION.get(
response.status_code, FlowApiError,
)
Expand Down
19 changes: 19 additions & 0 deletions flowbio/v2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
HTTP status code and error message from the response.
"""

from http import HTTPStatus


class FlowApiError(Exception):
"""Base exception for all Flow API errors.
Expand Down Expand Up @@ -35,6 +37,23 @@ class BadRequestError(FlowApiError):
pass


class AnnotationValidationError(BadRequestError):
"""Raised when the annotation upload returns hard validation errors.

The annotation endpoint returns ``{"validation": [...]}`` with status 400
for errors that cannot be ignored (as opposed to warnings which can).

:param errors: List of validation error dicts from the response.
"""

def __init__(self, errors: list[dict]) -> None:
self.errors = errors
super().__init__(
HTTPStatus.BAD_REQUEST,
f"Annotation has {len(errors)} validation error(s)",
)


class NotFoundError(FlowApiError):
"""Raised when the API returns a 404 Not Found response."""

Expand Down
176 changes: 145 additions & 31 deletions flowbio/v2/samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from tqdm import tqdm

from flowbio.v2._pagination import PageIterator
from flowbio.v2.exceptions import AnnotationValidationError, BadRequestError

if TYPE_CHECKING:
from flowbio.v2.client import ClientConfig
Expand Down Expand Up @@ -138,6 +139,20 @@ class Sample(BaseModel, frozen=True):
id: str


class MultiplexedUpload(BaseModel, frozen=True):
"""Result of a multiplexed data upload.

:param data_ids: IDs for the uploaded multiplexed reads data.
:param annotation_id: ID for the uploaded annotation data.
:param warnings: Annotation warnings returned by the server.
Empty if the annotation was accepted without warnings.
"""

data_ids: list[str]
annotation_id: str
warnings: list[dict]


class SampleResource:
"""Provides access to sample-related API endpoints.

Expand Down Expand Up @@ -214,18 +229,98 @@ def upload_sample(
result: dict = {}
for file_index, (data_type, file_path) in enumerate(files):
is_last_file = file_index == len(files) - 1
result = self._upload_file(
file_path=file_path,
is_last_file=is_last_file,
previous_data_ids=previous_data_ids,
sample_fields=self._build_sample_fields(
name, sample_type, metadata, project_id, organism_id,
),
fields = self._build_sample_fields(
name, sample_type, metadata, project_id, organism_id,
)
result = self._upload_in_chunks(
"/upload/sample",
file_path,
extra_fields={
"is_last_sample": is_last_file,
"previous_data": previous_data_ids,
**(fields or {}),
},
)
if not is_last_file:
previous_data_ids.append(result["data_id"])
return Sample(id=result["sample_id"])

def upload_multiplexed_data(
self,
reads: dict[str, Path],
annotation: Path,
ignore_warnings: bool = True,
) -> MultiplexedUpload:
"""Upload multiplexed reads and an annotation sheet.

Validates and uploads the annotation sheet first, so that reads
files are not uploaded if the annotation is invalid. Then uploads
one or two reads files to ``/upload/multiplexed``.

By default, annotation warnings are automatically accepted (the
upload is retried with ``ignore_warnings=True``) and included in
the result for inspection. Set ``ignore_warnings=False`` to
reject the upload on warnings instead.

Requires authentication.

Example::

from pathlib import Path

result = client.samples.upload_multiplexed_data(
reads={"reads1": Path("multiplexed_R1.fastq.gz")},
annotation=Path("annotation.xlsx"),
)
print(f"Data IDs: {result.data_ids}")
print(f"Annotation ID: {result.annotation_id}")
if result.warnings:
print(f"Warnings: {result.warnings}")

:param reads: A mapping of reads keys to file paths. Use
``reads1`` for single-end, or ``reads1`` and ``reads2`` for
paired-end. ``reads1`` is always uploaded first::

# Single-end
{"reads1": Path("multiplexed.fastq.gz")}

# Paired-end
{"reads1": Path("R1.fastq.gz"), "reads2": Path("R2.fastq.gz")}

:param annotation: Path to the annotation sheet (``.xlsx`` or
``.csv``). Use :meth:`get_annotation_template` to download a
template.
:param ignore_warnings: If ``True`` (the default), annotation
warnings are automatically accepted and included in the
result. If ``False``, warnings cause a
:class:`BadRequestError` to be raised.
:raises ValueError: If reads keys are invalid (e.g. ``reads3``)
or ``reads2`` is provided without ``reads1``.
:raises AnnotationValidationError: If the annotation has hard
validation errors that cannot be ignored.
:raises AnnotationValidationError: If ``ignore_warnings=False``
and the annotation has warnings.
"""
files = self._ordered_files(reads)

annotation_id, warnings = self._upload_annotation(
annotation, ignore_warnings,
)

data_ids: list[str] = []
for _, file_path in files:
extra_fields = {"reads1": data_ids[0]} if data_ids else {}
result = self._upload_in_chunks(
"/upload/multiplexed", file_path, extra_fields,
)
data_ids.append(result["id"])

return MultiplexedUpload(
data_ids=data_ids,
annotation_id=annotation_id,
warnings=warnings,
)

def get_annotation_template(self, sample_type: str = "generic") -> bytes:
"""Download an annotation sheet template for multiplexed uploads.

Expand Down Expand Up @@ -325,12 +420,11 @@ def _create_metadata_attribute(self, item: dict) -> MetadataAttribute:
item["options"] = self._resolve_options(item)
return MetadataAttribute(**item)

def _upload_file(
def _upload_in_chunks(
self,
endpoint: str,
file_path: Path,
is_last_file: bool,
previous_data_ids: list[str],
sample_fields: dict[str, str],
extra_fields: dict | None = None,
) -> dict:
chunk_size = self._config.chunk_size
file_size = file_path.stat().st_size
Expand All @@ -344,29 +438,49 @@ def _upload_file(
desc=f"Uploading {file_path.name}",
unit="chunk",
)
for chunk_index in chunks:
is_last_chunk = chunk_index == num_chunks - 1
is_last_sample = is_last_file and is_last_chunk
with open(file_path, "rb") as f:
f.seek(chunk_index * chunk_size)
with open(file_path, "rb") as f:
for chunk_index in chunks:
is_last_chunk = chunk_index == num_chunks - 1
form_data: dict[str, str | bool | list[str] | None] = {
"filename": file_path.name,
# API uses this as a byte offset to verify upload resumption
"expected_file_size": str(chunk_index * chunk_size),
"is_last": is_last_chunk,
"data": data_id,
**(extra_fields or {}),
}
chunk = f.read(chunk_size)
form_data: dict[str, str] = {
"filename": file_path.name,
"expected_file_size": str(chunk_index * chunk_size),
"is_last": is_last_chunk,
"data": data_id,
"is_last_sample": is_last_sample,
"previous_data": previous_data_ids,
**(sample_fields or {}),
}
result = self._transport.post(
"/upload/sample",
data=form_data,
files={"blob": (file_path.name, chunk, "application/octet-stream")},
)
data_id = result["data_id"]
result = self._transport.post(
endpoint,
data=form_data,
files={"blob": (file_path.name, chunk, "application/octet-stream")},
)
data_id = result.get("data_id") or result.get("id")
return result

def _upload_annotation(
self, file_path: Path, ignore_warnings: bool,
) -> tuple[str, list[dict]]:
try:
result = self._upload_in_chunks("/upload/annotation", file_path)
return result["id"], []
except BadRequestError as e:
if isinstance(e.message, dict) and "validation" in e.message:
raise AnnotationValidationError(errors=e.message["validation"]) from e
if isinstance(e.message, dict) and "warnings" in e.message:
if not ignore_warnings:
raise AnnotationValidationError(
errors=e.message["warnings"],
) from e
warnings = e.message["warnings"]
result = self._upload_in_chunks(
"/upload/annotation",
file_path,
extra_fields={"ignore_warnings": True},
)
return result["id"], warnings
raise

_VALID_READS_KEYS = {"reads1", "reads2"}

@staticmethod
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/v2/test_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from flowbio.v2.exceptions import (
AnnotationValidationError,
AuthenticationError,
BadRequestError,
FlowApiError,
Expand Down Expand Up @@ -73,6 +74,34 @@ def test_stores_status_code_and_message(self) -> None:
assert error.message == message


class TestAnnotationValidationError:

def test_is_bad_request_error(self) -> None:
error = AnnotationValidationError(
errors=[{"row": 1, "message": "Invalid scientist"}],
)

assert isinstance(error, BadRequestError)

def test_stores_validation_errors(self) -> None:
validation_errors = [
{"row": 1, "message": "Invalid scientist"},
{"row": 2, "message": "Missing barcode"},
]

error = AnnotationValidationError(errors=validation_errors)

assert error.errors == validation_errors

def test_str_includes_error_count(self) -> None:
error = AnnotationValidationError(
errors=[{"row": 1, "message": "Invalid scientist"}],
)

assert "1" in str(error)
assert "validation" in str(error).lower()


class TestNotFoundError:

def test_is_flow_api_error(self) -> None:
Expand Down
Loading