From f35664bc0d87da137fd8e5727d8b21e5ae897e25 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Tue, 5 Aug 2025 11:06:10 +0200 Subject: [PATCH 01/10] fix: add exponential retry logic to stackit embedder --- infrastructure/rag/values.yaml | 3 ++ .../impl/embeddings/stackit_embedder.py | 48 +++++++++++++++---- .../settings/stackit_embedder_settings.py | 9 ++++ 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index fb39a7b5..46d87545 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -192,6 +192,9 @@ backend: stackitEmbedder: STACKIT_EMBEDDER_MODEL: "intfloat/e5-mistral-7b-instruct" STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1 + STACKIT_EMBEDDER_MAX_RETRIES: 10 + STACKIT_EMBEDDER_RETRY_BASE_DELAY: 1.0 + STACKIT_EMBEDDER_RETRY_MAX_DELAY: 60.0 ollama: OLLAMA_MODEL: "llama3.2:3b-instruct-fp16" OLLAMA_BASE_URL: "http://rag-ollama:11434" diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 65d67a1f..0fe5d76a 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -1,11 +1,16 @@ """Module that contains the StackitEmbedder class.""" +import time +import logging + from langchain_core.embeddings import Embeddings from openai import OpenAI from rag_core_api.embeddings.embedder import Embedder from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings +logger = logging.getLogger(__name__) + class StackitEmbedder(Embedder, Embeddings): """A class that represents any Langchain provided Embedder.""" @@ -36,24 +41,51 @@ def get_embedder(self) -> "StackitEmbedder": def embed_documents(self, texts: list[str]) -> list[list[float]]: """ - Embed a list of documents into numerical vectors. + Embed a batch of texts with exponential backoff retry logic. Batching is handled by the vector + database client. Parameters ---------- texts : list of str - A list of documents to be embedded. + A batch of texts to be embedded. Returns ------- list[list[float]] - A list where each element is a list of floats representing the embedded vector of a document. + A list of embeddings for the batch. + + Raises + ------ + Exception + If all retry attempts fail. """ - responses = self._client.embeddings.create( - input=texts, - model=self._settings.model, - ) + if not texts: + return [] + + for attempt in range(self._settings.max_retries + 1): + try: + responses = self._client.embeddings.create( + input=texts, + model=self._settings.model, + ) + return [data.embedding for data in responses.data] + + except Exception as e: + if attempt == self._settings.max_retries: + logger.error("Failed to embed batch after %d attempts: %s", + self._settings.max_retries + 1, str(e)) + raise + + # Calculate exponential backoff delay + delay = min( + self._settings.retry_base_delay * (2 ** attempt), + self._settings.retry_max_delay + ) + + logger.warning("Embedding attempt %d/%d failed: %s. Retrying in %.2f seconds...", + attempt + 1, self._settings.max_retries + 1, str(e), delay) - return [data.embedding for data in responses.data] + time.sleep(delay) def embed_query(self, text: str) -> list[float]: """ diff --git a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py index e451f06c..c6f956aa 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py +++ b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py @@ -17,6 +17,12 @@ class StackitEmbedderSettings(BaseSettings): (default "https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1"). api_key : str The API key for authentication. + max_retries : int + Maximum number of retry attempts (default 10). + retry_base_delay : float + Base delay in seconds for exponential backoff (default 1.0). + retry_max_delay : float + Maximum delay in seconds between retries (default 60.0). """ class Config: @@ -28,3 +34,6 @@ class Config: model: str = Field(default="intfloat/e5-mistral-7b-instruct") base_url: str = Field(default="https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1") api_key: str = Field(default="") + max_retries: int = Field(default=10, description="Maximum number of retry attempts") + retry_base_delay: float = Field(default=1.0, description="Base delay in seconds for exponential backoff") + retry_max_delay: float = Field(default=60.0, description="Maximum delay in seconds between retries") From 5bd72776bafef9144c1f778a88142ad5b9f50a8a Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Tue, 5 Aug 2025 11:16:00 +0200 Subject: [PATCH 02/10] refactor: with black --- .../impl/embeddings/stackit_embedder.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 0fe5d76a..2a9fc683 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -72,19 +72,20 @@ def embed_documents(self, texts: list[str]) -> list[list[float]]: except Exception as e: if attempt == self._settings.max_retries: - logger.error("Failed to embed batch after %d attempts: %s", - self._settings.max_retries + 1, str(e)) + logger.error("Failed to embed batch after %d attempts: %s", self._settings.max_retries + 1, str(e)) raise # Calculate exponential backoff delay - delay = min( - self._settings.retry_base_delay * (2 ** attempt), - self._settings.retry_max_delay + delay = min(self._settings.retry_base_delay * (2**attempt), self._settings.retry_max_delay) + + logger.warning( + "Embedding attempt %d/%d failed: %s. Retrying in %.2f seconds...", + attempt + 1, + self._settings.max_retries + 1, + str(e), + delay, ) - logger.warning("Embedding attempt %d/%d failed: %s. Retrying in %.2f seconds...", - attempt + 1, self._settings.max_retries + 1, str(e), delay) - time.sleep(delay) def embed_query(self, text: str) -> list[float]: From d89d3ceb9e93b6fa5d2f66242210fc7e4fa91716 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 6 Aug 2025 10:02:10 +0200 Subject: [PATCH 03/10] fix: update parameter type annotation for embed_documents method --- .../src/rag_core_api/impl/embeddings/stackit_embedder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 2a9fc683..12e84acf 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -46,7 +46,7 @@ def embed_documents(self, texts: list[str]) -> list[list[float]]: Parameters ---------- - texts : list of str + texts : list[str] A batch of texts to be embedded. Returns From 07ac8cb115983dbbce72d336ff33aaa3d559b3f1 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 14 Aug 2025 12:44:57 +0200 Subject: [PATCH 04/10] Update libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../src/rag_core_api/impl/embeddings/stackit_embedder.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 12e84acf..ae4aae83 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -76,7 +76,8 @@ def embed_documents(self, texts: list[str]) -> list[list[float]]: raise # Calculate exponential backoff delay - delay = min(self._settings.retry_base_delay * (2**attempt), self._settings.retry_max_delay) + # Calculate exponential backoff delay (cap exponent to prevent overflow) + delay = min(self._settings.retry_base_delay * (2**min(attempt, 10)), self._settings.retry_max_delay) logger.warning( "Embedding attempt %d/%d failed: %s. Retrying in %.2f seconds...", From c5ce1220f8e4708365cb228742c38d7488c3219e Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 14 Aug 2025 22:13:03 +0200 Subject: [PATCH 05/10] feat: enhance embed_documents with rate limit handling and exponential backoff --- .../impl/embeddings/stackit_embedder.py | 86 +++++++++++++++++-- 1 file changed, 78 insertions(+), 8 deletions(-) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index ae4aae83..682aa16d 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -2,9 +2,11 @@ import time import logging +import random from langchain_core.embeddings import Embeddings -from openai import OpenAI +from openai import OpenAI, APIError, APITimeoutError, RateLimitError, APIConnectionError +from fastapi import status from rag_core_api.embeddings.embedder import Embedder from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings @@ -15,6 +17,11 @@ class StackitEmbedder(Embedder, Embeddings): """A class that represents any Langchain provided Embedder.""" + ATTEMPT_CAP = 10 + BACKOFF_FACTOR = 2 + JITTER_MIN_SECONDS = 0.05 + JITTER_MAX_SECONDS = 0.25 + def __init__(self, stackit_embedder_settings: StackitEmbedderSettings): """ Initialize the StackitEmbedder with the given settings. @@ -63,21 +70,84 @@ def embed_documents(self, texts: list[str]) -> list[list[float]]: return [] for attempt in range(self._settings.max_retries + 1): + response = None try: - responses = self._client.embeddings.create( + response = self._client.embeddings.with_raw_response.create( input=texts, model=self._settings.model, ) - return [data.embedding for data in responses.data] + # Success - return embeddings + return [data.embedding for data in response.parse().data] + + except (APIError, RateLimitError, APITimeoutError, APIConnectionError) as e: + # Prefer rate-limit-aware retry using headers when available + resp = getattr(e, "response", None) + status_code = getattr(resp, "status_code", None) + raw_headers = getattr(resp, "headers", {}) or {} + # Normalize header keys to lowercase for case-insensitive access + headers = {str(k).lower(): v for k, v in raw_headers.items()} if isinstance(raw_headers, dict) else {} + + if isinstance(e, RateLimitError) or status_code == status.HTTP_429_TOO_MANY_REQUESTS: + # Use Stackit AI Model Serving rate limit headers to determine wait + # x-ratelimit-reset-requests / x-ratelimit-reset-tokens can be like "1s" or "1.5s" + def _to_seconds(v): + if v is None: + return None + try: + # Accept numbers, "1", "1.5", "1s" + s = str(v).strip().lower() + if s.endswith("s"): + return float(s[:-1]) + return float(s) + except Exception: + return None + + wait_candidates = [] + if headers: + request_reset = headers.get("x-ratelimit-reset-requests") + token_reset = headers.get("x-ratelimit-reset-tokens") + for v in (request_reset, token_reset): + fv = _to_seconds(v) + if fv is not None: + wait_candidates.append(fv) + + # Fallback to exponential backoff if headers missing + wait = ( + max(wait_candidates) if wait_candidates else min( + self._settings.retry_base_delay * (self.BACKOFF_FACTOR ** min(attempt, self.ATTEMPT_CAP)), + self._settings.retry_max_delay, + ) + ) + # Fixed jitter independent of wait to avoid thundering herd (50–250ms) + jitter = random.uniform(self.JITTER_MIN_SECONDS, self.JITTER_MAX_SECONDS) + total_wait = min(wait + jitter, self._settings.retry_max_delay) + + logger.warning( + "Rate limited (429). Remaining: req=%s tok=%s. Reset in: req=%s tok=%s. Retrying in %.2fs (attempt %d/%d)...", + headers.get("x-ratelimit-remaining-requests", "?"), + headers.get("x-ratelimit-remaining-tokens", "?"), + headers.get("x-ratelimit-reset-requests", "?"), + headers.get("x-ratelimit-reset-tokens", "?"), + total_wait, + attempt + 1, + self._settings.max_retries + 1, + ) + time.sleep(total_wait) + continue - except Exception as e: if attempt == self._settings.max_retries: - logger.error("Failed to embed batch after %d attempts: %s", self._settings.max_retries + 1, str(e)) + logger.error( + "Failed to embed batch after %d attempts: %s", + self._settings.max_retries + 1, + str(e), + ) raise - # Calculate exponential backoff delay - # Calculate exponential backoff delay (cap exponent to prevent overflow) - delay = min(self._settings.retry_base_delay * (2**min(attempt, 10)), self._settings.retry_max_delay) + # Exponential backoff for non-429 errors (timeouts, connections, etc.) + delay = min( + self._settings.retry_base_delay * (self.BACKOFF_FACTOR ** min(attempt, self.ATTEMPT_CAP)), + self._settings.retry_max_delay, + ) logger.warning( "Embedding attempt %d/%d failed: %s. Retrying in %.2f seconds...", From 5de11a2283ccbb80265ceb9b9c8e388cd20297dc Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 14 Aug 2025 22:18:48 +0200 Subject: [PATCH 06/10] chore: add verbose flag to npm install in Dockerfiles for better logging --- .../src/rag_core_api/impl/embeddings/stackit_embedder.py | 2 +- services/frontend/apps/admin-app/Dockerfile | 2 +- services/frontend/apps/chat-app/Dockerfile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 682aa16d..8c54425b 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -118,7 +118,7 @@ def _to_seconds(v): self._settings.retry_max_delay, ) ) - # Fixed jitter independent of wait to avoid thundering herd (50–250ms) + # Jitter to avoid too many parallel requests at the same time jitter = random.uniform(self.JITTER_MIN_SECONDS, self.JITTER_MAX_SECONDS) total_wait = min(wait + jitter, self._settings.retry_max_delay) diff --git a/services/frontend/apps/admin-app/Dockerfile b/services/frontend/apps/admin-app/Dockerfile index d00f9e25..ce069b34 100644 --- a/services/frontend/apps/admin-app/Dockerfile +++ b/services/frontend/apps/admin-app/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /usr/src/app COPY ./services/frontend/package*.json ./ -RUN npm install +RUN npm install --verbose COPY ./services/frontend . diff --git a/services/frontend/apps/chat-app/Dockerfile b/services/frontend/apps/chat-app/Dockerfile index 7fd60b75..7074c8a8 100644 --- a/services/frontend/apps/chat-app/Dockerfile +++ b/services/frontend/apps/chat-app/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /usr/src/app COPY ./services/frontend/package*.json ./ -RUN npm install +RUN npm install --verbose COPY ./services/frontend . From c9623671b982aa04f0b9fdfd2ca3f8c27c13775e Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Fri, 15 Aug 2025 16:51:42 +0200 Subject: [PATCH 07/10] feat: refactor StackitEmbedder to use retry decorator for improved error handling and add username/password to Redis connection chore: remove unused KeyDB references and update related configurations fix: increase maximum retry delay for StackitEmbedder settings --- infrastructure/rag/Chart.lock | 7 +- infrastructure/rag/Chart.yaml | 5 - infrastructure/rag/values.yaml | 19 +-- .../key_db/file_status_key_value_store.py | 2 +- .../impl/settings/key_value_settings.py | 2 + .../impl/summarizer/langchain_summarizer.py | 61 ++++---- .../impl/embeddings/stackit_embedder.py | 124 ++++------------ .../settings/stackit_embedder_settings.py | 2 +- .../impl/settings/stackit_vllm_settings.py | 3 + .../impl/utils/retry_decorator.py | 138 ++++++++++++++++++ .../src/rag_core_lib/impl/utils/utils.py | 71 +++++++++ 11 files changed, 284 insertions(+), 150 deletions(-) create mode 100644 libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py create mode 100644 libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py diff --git a/infrastructure/rag/Chart.lock b/infrastructure/rag/Chart.lock index 0ea3ffc3..e17a8206 100644 --- a/infrastructure/rag/Chart.lock +++ b/infrastructure/rag/Chart.lock @@ -8,11 +8,8 @@ dependencies: - name: minio repository: https://charts.bitnami.com/bitnami version: 15.0.7 -- name: keydb - repository: https://enapter.github.io/charts/ - version: 0.48.0 - name: ollama repository: https://otwld.github.io/ollama-helm/ version: 1.24.0 -digest: sha256:30cc964388864bd738eb0668367576dbad45eb9d6a885cdc48ef7c07c9692aa0 -generated: "2025-08-04T09:23:39.885193+02:00" +digest: sha256:2d297b86ec5ea607319c691551dd99b658c03c5ba1ca9884b9b1ede4c0c445a8 +generated: "2025-08-15T15:23:26.079292+02:00" diff --git a/infrastructure/rag/Chart.yaml b/infrastructure/rag/Chart.yaml index 7e31093c..66403bd4 100644 --- a/infrastructure/rag/Chart.yaml +++ b/infrastructure/rag/Chart.yaml @@ -20,11 +20,6 @@ dependencies: repository: https://charts.bitnami.com/bitnami version: "15.0.7" condition: features.minio.enabled -- name: keydb - alias: keydb - repository: https://enapter.github.io/charts/ - version: "0.48.0" - condition: features.keydb.enabled - name: ollama alias: ollama version: 1.24.0 diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index e4dc92f3..2f8cefc3 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -9,8 +9,6 @@ features: enabled: true frontend: enabled: true - keydb: - enabled: true mcp: enabled: true @@ -194,7 +192,7 @@ backend: STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1 STACKIT_EMBEDDER_MAX_RETRIES: 10 STACKIT_EMBEDDER_RETRY_BASE_DELAY: 1.0 - STACKIT_EMBEDDER_RETRY_MAX_DELAY: 60.0 + STACKIT_EMBEDDER_RETRY_MAX_DELAY: 600.0 ollama: OLLAMA_MODEL: "llama3.2:3b-instruct-fp16" OLLAMA_BASE_URL: "http://rag-ollama:11434" @@ -324,7 +322,9 @@ adminBackend: CHUNKER_OVERLAP: 300 keyValueStore: USECASE_KEYVALUE_PORT: 6379 - USECASE_KEYVALUE_HOST: "rag-keydb" + USECASE_KEYVALUE_HOST: "rag-redis-primary" + USECASE_KEYVALUE_USERNAME: "default" + USECASE_KEYVALUE_PASSWORD: "MOqTsoa22R" sourceUploader: SOURCE_UPLOADER_TIMEOUT: 3600 @@ -529,12 +529,12 @@ langfuse: # Redis Configuration (external KeyDB) redis: - deploy: false - host: "rag-keydb" + deploy: true + host: "rag-redis-primary" port: 6379 auth: username: "default" - password: "" + password: "MOqTsoa22R" # ClickHouse Configuration (external ClickHouse) clickhouse: @@ -620,8 +620,3 @@ ollama: qdrant: image: tag: v1.14.1 - -keydb: - multiMaster: "no" - activeReplicas: "no" - nodes: 1 diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py b/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py index ef27b65a..591742f0 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py @@ -39,7 +39,7 @@ def __init__(self, settings: KeyValueSettings): settings : KeyValueSettings The settings object containing the host and port information for the Redis connection. """ - self._redis = Redis(host=settings.host, port=settings.port, decode_responses=True) + self._redis = Redis(host=settings.host, port=settings.port, username=settings.username, password=settings.password, decode_responses=True) @staticmethod def _to_str(file_name: str, file_status: Status) -> str: diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/settings/key_value_settings.py b/libs/admin-api-lib/src/admin_api_lib/impl/settings/key_value_settings.py index 5acc6371..6465a4c1 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/settings/key_value_settings.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/settings/key_value_settings.py @@ -24,3 +24,5 @@ class Config: host: str = Field() port: int = Field() + username: str = Field() + password: str = Field() diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index 49f39dcb..bc558e4c 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -2,7 +2,6 @@ import asyncio import logging -import traceback from typing import Optional from langchain.text_splitter import RecursiveCharacterTextSplitter @@ -16,6 +15,7 @@ ) from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff logger = logging.getLogger(__name__) @@ -28,8 +28,6 @@ class LangchainSummarizer(Summarizer): document and retries the summarization process if an error occurs. """ - RETRY_WAIT_TIME = 10 - def __init__( self, langfuse_manager: LangfuseManager, @@ -68,45 +66,42 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] """ assert query, "Query is empty: %s" % query # noqa S101 config = ensure_config(config) - tries_remaining = config.get("configurable", {}).get("tries_remaining", 3) - logger.debug("Tries remaining %d" % tries_remaining) - if tries_remaining < 0: - raise Exception("Summary creation failed.") document = Document(page_content=query) langchain_documents = self._chunker.split_documents([document]) + logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) - outputs = [] - for langchain_document in langchain_documents: - async with self._semaphore: - try: - result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) - except Exception as e: - logger.error("Error in summarizing langchain doc: %s %s", e, traceback.format_exc()) - config["tries_remaining"] = tries_remaining - 1 - if "rate limit" in str(e).lower() or "ratelimit" in str(e).lower(): - logger.warning( - "Rate limit encountered, waiting %d seconds before retry...", self.RETRY_WAIT_TIME - ) - await asyncio.sleep(self.RETRY_WAIT_TIME) - result = await self.ainvoke(query, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) + # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk + tasks = [ + asyncio.create_task(self._summarize_chunk(doc.page_content, config)) + for doc in langchain_documents + ] + outputs = await asyncio.gather(*tasks) if len(outputs) == 1: return outputs[0] - summary = " ".join(outputs) + + # Optional single reduce pass (no recursion) + merged = " ".join(outputs) logger.debug( - "Reduced number of chars from %d to %d" - % (len("".join([x.page_content for x in langchain_documents])), len(summary)) + "Reduced number of chars from %d to %d", + len("".join([x.page_content for x in langchain_documents])), + len(merged), ) - return await self.ainvoke(summary, config) + return await self._summarize_chunk(merged, config) def _create_chain(self) -> Runnable: - return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm( - self.__class__.__name__ + return ( + self._langfuse_manager.get_base_prompt(self.__class__.__name__) + | self._langfuse_manager.get_base_llm(self.__class__.__name__) ) + + @retry_with_backoff(logger=logger) + async def _summarize_chunk_impl(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + response = await self._create_chain().ainvoke({"text": text}, config) + return response.content if hasattr(response, "content") else str(response) + + async def _summarize_chunk(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + # Hold the semaphore for the entire retry lifecycle + async with self._semaphore: + return await self._summarize_chunk_impl(text, config) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 8c54425b..cc63de50 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -1,15 +1,14 @@ """Module that contains the StackitEmbedder class.""" -import time import logging -import random from langchain_core.embeddings import Embeddings from openai import OpenAI, APIError, APITimeoutError, RateLimitError, APIConnectionError -from fastapi import status from rag_core_api.embeddings.embedder import Embedder from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings +from rag_core_lib.impl.utils.retry_decorator import RetrySettings, retry_with_backoff + logger = logging.getLogger(__name__) @@ -69,95 +68,15 @@ def embed_documents(self, texts: list[str]) -> list[list[float]]: if not texts: return [] - for attempt in range(self._settings.max_retries + 1): - response = None - try: - response = self._client.embeddings.with_raw_response.create( - input=texts, - model=self._settings.model, - ) - # Success - return embeddings - return [data.embedding for data in response.parse().data] - - except (APIError, RateLimitError, APITimeoutError, APIConnectionError) as e: - # Prefer rate-limit-aware retry using headers when available - resp = getattr(e, "response", None) - status_code = getattr(resp, "status_code", None) - raw_headers = getattr(resp, "headers", {}) or {} - # Normalize header keys to lowercase for case-insensitive access - headers = {str(k).lower(): v for k, v in raw_headers.items()} if isinstance(raw_headers, dict) else {} - - if isinstance(e, RateLimitError) or status_code == status.HTTP_429_TOO_MANY_REQUESTS: - # Use Stackit AI Model Serving rate limit headers to determine wait - # x-ratelimit-reset-requests / x-ratelimit-reset-tokens can be like "1s" or "1.5s" - def _to_seconds(v): - if v is None: - return None - try: - # Accept numbers, "1", "1.5", "1s" - s = str(v).strip().lower() - if s.endswith("s"): - return float(s[:-1]) - return float(s) - except Exception: - return None - - wait_candidates = [] - if headers: - request_reset = headers.get("x-ratelimit-reset-requests") - token_reset = headers.get("x-ratelimit-reset-tokens") - for v in (request_reset, token_reset): - fv = _to_seconds(v) - if fv is not None: - wait_candidates.append(fv) - - # Fallback to exponential backoff if headers missing - wait = ( - max(wait_candidates) if wait_candidates else min( - self._settings.retry_base_delay * (self.BACKOFF_FACTOR ** min(attempt, self.ATTEMPT_CAP)), - self._settings.retry_max_delay, - ) - ) - # Jitter to avoid too many parallel requests at the same time - jitter = random.uniform(self.JITTER_MIN_SECONDS, self.JITTER_MAX_SECONDS) - total_wait = min(wait + jitter, self._settings.retry_max_delay) - - logger.warning( - "Rate limited (429). Remaining: req=%s tok=%s. Reset in: req=%s tok=%s. Retrying in %.2fs (attempt %d/%d)...", - headers.get("x-ratelimit-remaining-requests", "?"), - headers.get("x-ratelimit-remaining-tokens", "?"), - headers.get("x-ratelimit-reset-requests", "?"), - headers.get("x-ratelimit-reset-tokens", "?"), - total_wait, - attempt + 1, - self._settings.max_retries + 1, - ) - time.sleep(total_wait) - continue - - if attempt == self._settings.max_retries: - logger.error( - "Failed to embed batch after %d attempts: %s", - self._settings.max_retries + 1, - str(e), - ) - raise - - # Exponential backoff for non-429 errors (timeouts, connections, etc.) - delay = min( - self._settings.retry_base_delay * (self.BACKOFF_FACTOR ** min(attempt, self.ATTEMPT_CAP)), - self._settings.retry_max_delay, - ) - - logger.warning( - "Embedding attempt %d/%d failed: %s. Retrying in %.2f seconds...", - attempt + 1, - self._settings.max_retries + 1, - str(e), - delay, - ) - - time.sleep(delay) + @self._retry_wrapper() + def _call(batch: list[str]) -> list[list[float]]: + response = self._client.embeddings.with_raw_response.create( + input=batch, + model=self._settings.model, + ) + return [data.embedding for data in response.parse().data] + + return _call(texts) def embed_query(self, text: str) -> list[float]: """ @@ -173,4 +92,23 @@ def embed_query(self, text: str) -> list[float]: list[float] The embedded representation of the query text. """ - return self.embed_documents([text])[0] + embeddings = self.embed_documents([text])[0] + return embeddings if embeddings else [] + + def _retry_wrapper(self): + """Build a retry decorator *with runtime settings* from self._settings.""" + return retry_with_backoff( + settings=RetrySettings( + max_retries=self._settings.max_retries, + retry_base_delay=self._settings.retry_base_delay, + retry_max_delay=self._settings.retry_max_delay, + backoff_factor=self.BACKOFF_FACTOR, + attempt_cap=self.ATTEMPT_CAP, + jitter_min=self.JITTER_MIN_SECONDS, + jitter_max=self.JITTER_MAX_SECONDS, + ), + exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), + rate_limit_exceptions=(RateLimitError,), + logger=logger, + ) + diff --git a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py index c6f956aa..7a026037 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py +++ b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py @@ -36,4 +36,4 @@ class Config: api_key: str = Field(default="") max_retries: int = Field(default=10, description="Maximum number of retry attempts") retry_base_delay: float = Field(default=1.0, description="Base delay in seconds for exponential backoff") - retry_max_delay: float = Field(default=60.0, description="Maximum delay in seconds between retries") + retry_max_delay: float = Field(default=600.0, description="Maximum delay in seconds between retries") diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/settings/stackit_vllm_settings.py b/libs/rag-core-lib/src/rag_core_lib/impl/settings/stackit_vllm_settings.py index ec0263bc..f1f6b133 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/settings/stackit_vllm_settings.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/settings/stackit_vllm_settings.py @@ -1,5 +1,6 @@ """Module contains settings regarding the STACKIT vLLM.""" +from typing import Literal from pydantic import Field from pydantic_settings import BaseSettings @@ -34,3 +35,5 @@ class Config: top_p: float = Field(default=0.1, title="LLM Top P") temperature: float = Field(default=0, title="LLM Temperature") + # Always include response headers; disallow overriding via env by constraining to Literal[True] + include_response_headers: Literal[True] = Field(default=True, title="Include Response Headers") diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py new file mode 100644 index 00000000..094ac655 --- /dev/null +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -0,0 +1,138 @@ +"""Reusable exponential backoff / retry decorator for sync and async functions.""" + +import asyncio +import inspect +import logging +import random +import time +from dataclasses import dataclass +from functools import wraps +from typing import Callable, Optional, ParamSpec, Tuple, Type, TypeVar + +from rag_core_lib.impl.utils.utils import headers_from_exception, status_code_from_exception, wait_from_rate_limit_headers + + +@dataclass +class RetrySettings: + max_retries: int = 5 # total retries (not counting the first attempt) + retry_base_delay: float = 0.5 # seconds + retry_max_delay: float = 600.0 # cap for any single wait + backoff_factor: float = 2.0 # exponential factor + attempt_cap: int = 6 # cap exponent growth (2**attempt_cap) + jitter_min: float = 0.05 # seconds + jitter_max: float = 0.25 # seconds + + +# Use ParamSpec and TypeVar for type-safe decorators +P = ParamSpec("P") +R = TypeVar("R") + + +def retry_with_backoff( + *, + settings: RetrySettings | None = None, + exceptions: Tuple[Type[BaseException], ...] = (Exception,), + rate_limit_exceptions: Tuple[Type[BaseException], ...] = (), + rate_limit_statuses: Tuple[int, ...] = (429,), + rate_limit_header_names: Tuple[str, ...] = ("x-ratelimit-reset-requests", "x-ratelimit-reset-tokens"), + is_rate_limited: Optional[Callable[[BaseException], bool]] = None, + logger: Optional[logging.Logger] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Apply robust retry logic with exponential backoff and optional rate-limit awareness. + """ + cfg = settings or RetrySettings() + + def _should_treat_as_rate_limited(exc: BaseException) -> bool: + if is_rate_limited and is_rate_limited(exc): + return True + if isinstance(exc, rate_limit_exceptions): + return True + status_code = status_code_from_exception(exc) + if status_code in rate_limit_statuses: + return True + msg = str(exc).lower() + return ("rate limit" in msg) or ("ratelimit" in msg) + + def _compute_backoff_wait(attempt: int) -> float: + delay = cfg.retry_base_delay * (cfg.backoff_factor ** min(attempt, cfg.attempt_cap)) + return min(delay, cfg.retry_max_delay) + + def _with_jitter(seconds: float) -> float: + return min(seconds + random.uniform(cfg.jitter_min, cfg.jitter_max), cfg.retry_max_delay) + + def _calculate_wait_time(attempt: int, exc: BaseException) -> float | None: + """ + Centralizes the logic for calculating wait time. + Returns None if the exception should be re-raised immediately. + """ + total_attempts = cfg.max_retries + 1 + if attempt == cfg.max_retries: + if logger: + logger.error("Failed after %d attempts: %s", total_attempts, exc, exc_info=False) + return None # Signal to re-raise + + if _should_treat_as_rate_limited(exc): + headers = headers_from_exception(exc) + wait = wait_from_rate_limit_headers(headers, rate_limit_header_names) + if wait is None: + wait = _compute_backoff_wait(attempt) + + final_wait = _with_jitter(wait) + if logger: + logger.warning( + "Rate limited. Remaining: req=%s tok=%s. Reset in: req=%s tok=%s. Retrying in %.2fs (attempt %d/%d)...", + headers.get("x-ratelimit-remaining-requests", "?"), + headers.get("x-ratelimit-remaining-tokens", "?"), + headers.get("x-ratelimit-reset-requests", "?"), + headers.get("x-ratelimit-reset-tokens", "?"), + final_wait, + attempt + 1, + total_attempts, + ) + logger.warning("headers: %s", headers) + return final_wait + else: + delay = _compute_backoff_wait(attempt) + if logger: + logger.warning( + "Attempt %d/%d failed: %s. Retrying in %.2fs...", + attempt + 1, + total_attempts, + exc, + delay, + exc_info=False, + ) + return delay + + def decorator(fn: Callable[P, R]) -> Callable[P, R]: + if inspect.iscoroutinefunction(fn): + @wraps(fn) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + for attempt in range(cfg.max_retries + 1): + try: + return await fn(*args, **kwargs) + except exceptions as exc: # type: ignore[misc] + wait_time = _calculate_wait_time(attempt, exc) + if wait_time is None: + raise + await asyncio.sleep(wait_time) + # This line should be unreachable if max_retries >= 0 + raise AssertionError("Retry loop exited unexpectedly.") + return async_wrapper + else: + @wraps(fn) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + for attempt in range(cfg.max_retries + 1): + try: + return fn(*args, **kwargs) + except exceptions as exc: # type: ignore[misc] + wait_time = _calculate_wait_time(attempt, exc) + if wait_time is None: + raise + time.sleep(wait_time) + # This line should be unreachable if max_retries >= 0 + raise AssertionError("Retry loop exited unexpectedly.") + return sync_wrapper + + return decorator diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py new file mode 100644 index 00000000..f39f917b --- /dev/null +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py @@ -0,0 +1,71 @@ +import re +from typing import Any, Iterable, Optional + + +def _to_seconds(v): + if v is None: + return None + try: + s = str(v).strip().lower() + # Support composite durations like "1h21m55s", as well as single-unit values + if any(u in s for u in ("h", "m", "s")): + total = 0.0 + for val, unit in re.findall(r"([0-9]+(?:\.[0-9]+)?)([hms])", s): + num = float(val) + if unit == "h": + total += num * 3600 + elif unit == "m": + total += num * 60 + else: # "s" + total += num + return total + # Fallback: plain number interpreted as seconds + return float(s) + except Exception: + return None + + +def _normalize_headers(raw_headers: Any) -> dict[str, str]: + """Return a lowercased dict[str, str] from httpx.Headers or mapping-like objects.""" + if not raw_headers: + return {} + try: + if hasattr(raw_headers, "items"): + items = list(raw_headers.items()) # works for dict-like and httpx.Headers + else: + items = list(dict(raw_headers).items()) + except Exception: + try: + items = list(dict(raw_headers).items()) + except Exception: + items = [] + out: dict[str, str] = {} + for k, v in items: + try: + out[str(k).lower()] = str(v) + except Exception: + continue + return out + + +def status_code_from_exception(exc: BaseException) -> Optional[int]: + resp = getattr(exc, "response", None) + return getattr(resp, "status_code", None) + + +def headers_from_exception(exc: BaseException) -> dict[str, str]: + resp = getattr(exc, "response", None) + raw = getattr(resp, "headers", None) + return _normalize_headers(raw) + + +def wait_from_rate_limit_headers( + headers: dict[str, str], + header_names: Iterable[str] = ("x-ratelimit-reset-requests", "x-ratelimit-reset-tokens"), +) -> Optional[float]: + candidates = [] + for name in header_names: + sec = _to_seconds(headers.get(name)) + if sec is not None: + candidates.append(sec) + return max(candidates) if candidates else None From 2786fa0f128bfaeb5396087c7ea56b725161fcaa Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Mon, 1 Sep 2025 15:27:50 +0200 Subject: [PATCH 08/10] fix: improve embed_query method to handle empty embeddings list --- .../src/rag_core_api/impl/embeddings/stackit_embedder.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index cc63de50..a7b6c03a 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -92,8 +92,11 @@ def embed_query(self, text: str) -> list[float]: list[float] The embedded representation of the query text. """ - embeddings = self.embed_documents([text])[0] - return embeddings if embeddings else [] + embeddings_list = self.embed_documents([text]) + if embeddings_list: + embeddings = embeddings_list[0] + return embeddings if embeddings else [] + return [] def _retry_wrapper(self): """Build a retry decorator *with runtime settings* from self._settings.""" From 240c1a731d2e5d2d866cad655d1d76765af81986 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Mon, 1 Sep 2025 15:28:29 +0200 Subject: [PATCH 09/10] fix: update embed_query method to return original embeddings list when not empty --- .../src/rag_core_api/impl/embeddings/stackit_embedder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index a7b6c03a..3069f19b 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -96,7 +96,7 @@ def embed_query(self, text: str) -> list[float]: if embeddings_list: embeddings = embeddings_list[0] return embeddings if embeddings else [] - return [] + return embeddings_list def _retry_wrapper(self): """Build a retry decorator *with runtime settings* from self._settings.""" From 5d4d5605146b7e78638f06819fdc4a45741a1e17 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Mon, 1 Sep 2025 15:37:58 +0200 Subject: [PATCH 10/10] fix: clarify comment on max_retries in RetrySettings --- .../rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py index 094ac655..46a439ed 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -14,7 +14,7 @@ @dataclass class RetrySettings: - max_retries: int = 5 # total retries (not counting the first attempt) + max_retries: int = 5 # total retries (not counting the first attempt, should be > 0) retry_base_delay: float = 0.5 # seconds retry_max_delay: float = 600.0 # cap for any single wait backoff_factor: float = 2.0 # exponential factor