diff --git a/infrastructure/rag/Chart.lock b/infrastructure/rag/Chart.lock index 0ea3ffc3..e17a8206 100644 --- a/infrastructure/rag/Chart.lock +++ b/infrastructure/rag/Chart.lock @@ -8,11 +8,8 @@ dependencies: - name: minio repository: https://charts.bitnami.com/bitnami version: 15.0.7 -- name: keydb - repository: https://enapter.github.io/charts/ - version: 0.48.0 - name: ollama repository: https://otwld.github.io/ollama-helm/ version: 1.24.0 -digest: sha256:30cc964388864bd738eb0668367576dbad45eb9d6a885cdc48ef7c07c9692aa0 -generated: "2025-08-04T09:23:39.885193+02:00" +digest: sha256:2d297b86ec5ea607319c691551dd99b658c03c5ba1ca9884b9b1ede4c0c445a8 +generated: "2025-08-15T15:23:26.079292+02:00" diff --git a/infrastructure/rag/Chart.yaml b/infrastructure/rag/Chart.yaml index 7e31093c..66403bd4 100644 --- a/infrastructure/rag/Chart.yaml +++ b/infrastructure/rag/Chart.yaml @@ -20,11 +20,6 @@ dependencies: repository: https://charts.bitnami.com/bitnami version: "15.0.7" condition: features.minio.enabled -- name: keydb - alias: keydb - repository: https://enapter.github.io/charts/ - version: "0.48.0" - condition: features.keydb.enabled - name: ollama alias: ollama version: 1.24.0 diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index 182db5eb..2f8cefc3 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -9,8 +9,6 @@ features: enabled: true frontend: enabled: true - keydb: - enabled: true mcp: enabled: true @@ -70,7 +68,7 @@ backend: image: repository: ghcr.io/stackitcloud/rag-template/mcp-server pullPolicy: Always - tag: "latest" + tag: "v2.1.0" name: backend replicaCount: 1 @@ -78,7 +76,7 @@ backend: image: repository: ghcr.io/stackitcloud/rag-template/rag-backend pullPolicy: Always - tag: "latest" + tag: "v2.1.0" command: - "poetry" @@ -192,6 +190,9 @@ backend: stackitEmbedder: STACKIT_EMBEDDER_MODEL: "intfloat/e5-mistral-7b-instruct" STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1 + STACKIT_EMBEDDER_MAX_RETRIES: 10 + STACKIT_EMBEDDER_RETRY_BASE_DELAY: 1.0 + STACKIT_EMBEDDER_RETRY_MAX_DELAY: 600.0 ollama: OLLAMA_MODEL: "llama3.2:3b-instruct-fp16" OLLAMA_BASE_URL: "http://rag-ollama:11434" @@ -216,7 +217,7 @@ frontend: image: repository: ghcr.io/stackitcloud/rag-template/frontend pullPolicy: Always - tag: "latest" + tag: "v2.1.0" service: type: ClusterIP @@ -251,7 +252,7 @@ adminBackend: image: repository: ghcr.io/stackitcloud/rag-template/admin-backend pullPolicy: Always - tag: "latest" + tag: "v2.1.0" command: - "poetry" @@ -321,7 +322,9 @@ adminBackend: CHUNKER_OVERLAP: 300 keyValueStore: USECASE_KEYVALUE_PORT: 6379 - USECASE_KEYVALUE_HOST: "rag-keydb" + USECASE_KEYVALUE_HOST: "rag-redis-primary" + USECASE_KEYVALUE_USERNAME: "default" + USECASE_KEYVALUE_PASSWORD: "MOqTsoa22R" sourceUploader: SOURCE_UPLOADER_TIMEOUT: 3600 @@ -331,7 +334,7 @@ extractor: image: repository: ghcr.io/stackitcloud/rag-template/document-extractor pullPolicy: Always - tag: "latest" + tag: "v2.1.0" command: - "poetry" @@ -379,7 +382,7 @@ adminFrontend: image: repository: ghcr.io/stackitcloud/rag-template/admin-frontend pullPolicy: Always - tag: "latest" + tag: "v2.1.0" service: type: ClusterIP @@ -526,12 +529,12 @@ langfuse: # Redis Configuration (external KeyDB) redis: - deploy: false - host: "rag-keydb" + deploy: true + host: "rag-redis-primary" port: 6379 auth: username: "default" - password: "" + password: "MOqTsoa22R" # ClickHouse Configuration (external ClickHouse) clickhouse: @@ -617,8 +620,3 @@ ollama: qdrant: image: tag: v1.14.1 - -keydb: - multiMaster: "no" - activeReplicas: "no" - nodes: 1 diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py b/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py index ef27b65a..591742f0 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py @@ -39,7 +39,7 @@ def __init__(self, settings: KeyValueSettings): settings : KeyValueSettings The settings object containing the host and port information for the Redis connection. """ - self._redis = Redis(host=settings.host, port=settings.port, decode_responses=True) + self._redis = Redis(host=settings.host, port=settings.port, username=settings.username, password=settings.password, decode_responses=True) @staticmethod def _to_str(file_name: str, file_status: Status) -> str: diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/settings/key_value_settings.py b/libs/admin-api-lib/src/admin_api_lib/impl/settings/key_value_settings.py index 5acc6371..6465a4c1 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/settings/key_value_settings.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/settings/key_value_settings.py @@ -24,3 +24,5 @@ class Config: host: str = Field() port: int = Field() + username: str = Field() + password: str = Field() diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index 49f39dcb..bc558e4c 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -2,7 +2,6 @@ import asyncio import logging -import traceback from typing import Optional from langchain.text_splitter import RecursiveCharacterTextSplitter @@ -16,6 +15,7 @@ ) from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff logger = logging.getLogger(__name__) @@ -28,8 +28,6 @@ class LangchainSummarizer(Summarizer): document and retries the summarization process if an error occurs. """ - RETRY_WAIT_TIME = 10 - def __init__( self, langfuse_manager: LangfuseManager, @@ -68,45 +66,42 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] """ assert query, "Query is empty: %s" % query # noqa S101 config = ensure_config(config) - tries_remaining = config.get("configurable", {}).get("tries_remaining", 3) - logger.debug("Tries remaining %d" % tries_remaining) - if tries_remaining < 0: - raise Exception("Summary creation failed.") document = Document(page_content=query) langchain_documents = self._chunker.split_documents([document]) + logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) - outputs = [] - for langchain_document in langchain_documents: - async with self._semaphore: - try: - result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) - except Exception as e: - logger.error("Error in summarizing langchain doc: %s %s", e, traceback.format_exc()) - config["tries_remaining"] = tries_remaining - 1 - if "rate limit" in str(e).lower() or "ratelimit" in str(e).lower(): - logger.warning( - "Rate limit encountered, waiting %d seconds before retry...", self.RETRY_WAIT_TIME - ) - await asyncio.sleep(self.RETRY_WAIT_TIME) - result = await self.ainvoke(query, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) + # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk + tasks = [ + asyncio.create_task(self._summarize_chunk(doc.page_content, config)) + for doc in langchain_documents + ] + outputs = await asyncio.gather(*tasks) if len(outputs) == 1: return outputs[0] - summary = " ".join(outputs) + + # Optional single reduce pass (no recursion) + merged = " ".join(outputs) logger.debug( - "Reduced number of chars from %d to %d" - % (len("".join([x.page_content for x in langchain_documents])), len(summary)) + "Reduced number of chars from %d to %d", + len("".join([x.page_content for x in langchain_documents])), + len(merged), ) - return await self.ainvoke(summary, config) + return await self._summarize_chunk(merged, config) def _create_chain(self) -> Runnable: - return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm( - self.__class__.__name__ + return ( + self._langfuse_manager.get_base_prompt(self.__class__.__name__) + | self._langfuse_manager.get_base_llm(self.__class__.__name__) ) + + @retry_with_backoff(logger=logger) + async def _summarize_chunk_impl(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + response = await self._create_chain().ainvoke({"text": text}, config) + return response.content if hasattr(response, "content") else str(response) + + async def _summarize_chunk(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + # Hold the semaphore for the entire retry lifecycle + async with self._semaphore: + return await self._summarize_chunk_impl(text, config) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 65d67a1f..3069f19b 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -1,15 +1,26 @@ """Module that contains the StackitEmbedder class.""" +import logging + from langchain_core.embeddings import Embeddings -from openai import OpenAI +from openai import OpenAI, APIError, APITimeoutError, RateLimitError, APIConnectionError from rag_core_api.embeddings.embedder import Embedder from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings +from rag_core_lib.impl.utils.retry_decorator import RetrySettings, retry_with_backoff + + +logger = logging.getLogger(__name__) class StackitEmbedder(Embedder, Embeddings): """A class that represents any Langchain provided Embedder.""" + ATTEMPT_CAP = 10 + BACKOFF_FACTOR = 2 + JITTER_MIN_SECONDS = 0.05 + JITTER_MAX_SECONDS = 0.25 + def __init__(self, stackit_embedder_settings: StackitEmbedderSettings): """ Initialize the StackitEmbedder with the given settings. @@ -36,24 +47,36 @@ def get_embedder(self) -> "StackitEmbedder": def embed_documents(self, texts: list[str]) -> list[list[float]]: """ - Embed a list of documents into numerical vectors. + Embed a batch of texts with exponential backoff retry logic. Batching is handled by the vector + database client. Parameters ---------- - texts : list of str - A list of documents to be embedded. + texts : list[str] + A batch of texts to be embedded. Returns ------- list[list[float]] - A list where each element is a list of floats representing the embedded vector of a document. + A list of embeddings for the batch. + + Raises + ------ + Exception + If all retry attempts fail. """ - responses = self._client.embeddings.create( - input=texts, - model=self._settings.model, - ) + if not texts: + return [] + + @self._retry_wrapper() + def _call(batch: list[str]) -> list[list[float]]: + response = self._client.embeddings.with_raw_response.create( + input=batch, + model=self._settings.model, + ) + return [data.embedding for data in response.parse().data] - return [data.embedding for data in responses.data] + return _call(texts) def embed_query(self, text: str) -> list[float]: """ @@ -69,4 +92,26 @@ def embed_query(self, text: str) -> list[float]: list[float] The embedded representation of the query text. """ - return self.embed_documents([text])[0] + embeddings_list = self.embed_documents([text]) + if embeddings_list: + embeddings = embeddings_list[0] + return embeddings if embeddings else [] + return embeddings_list + + def _retry_wrapper(self): + """Build a retry decorator *with runtime settings* from self._settings.""" + return retry_with_backoff( + settings=RetrySettings( + max_retries=self._settings.max_retries, + retry_base_delay=self._settings.retry_base_delay, + retry_max_delay=self._settings.retry_max_delay, + backoff_factor=self.BACKOFF_FACTOR, + attempt_cap=self.ATTEMPT_CAP, + jitter_min=self.JITTER_MIN_SECONDS, + jitter_max=self.JITTER_MAX_SECONDS, + ), + exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), + rate_limit_exceptions=(RateLimitError,), + logger=logger, + ) + diff --git a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py index e451f06c..7a026037 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py +++ b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py @@ -17,6 +17,12 @@ class StackitEmbedderSettings(BaseSettings): (default "https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1"). api_key : str The API key for authentication. + max_retries : int + Maximum number of retry attempts (default 10). + retry_base_delay : float + Base delay in seconds for exponential backoff (default 1.0). + retry_max_delay : float + Maximum delay in seconds between retries (default 60.0). """ class Config: @@ -28,3 +34,6 @@ class Config: model: str = Field(default="intfloat/e5-mistral-7b-instruct") base_url: str = Field(default="https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1") api_key: str = Field(default="") + max_retries: int = Field(default=10, description="Maximum number of retry attempts") + retry_base_delay: float = Field(default=1.0, description="Base delay in seconds for exponential backoff") + retry_max_delay: float = Field(default=600.0, description="Maximum delay in seconds between retries") diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/settings/stackit_vllm_settings.py b/libs/rag-core-lib/src/rag_core_lib/impl/settings/stackit_vllm_settings.py index ec0263bc..f1f6b133 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/settings/stackit_vllm_settings.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/settings/stackit_vllm_settings.py @@ -1,5 +1,6 @@ """Module contains settings regarding the STACKIT vLLM.""" +from typing import Literal from pydantic import Field from pydantic_settings import BaseSettings @@ -34,3 +35,5 @@ class Config: top_p: float = Field(default=0.1, title="LLM Top P") temperature: float = Field(default=0, title="LLM Temperature") + # Always include response headers; disallow overriding via env by constraining to Literal[True] + include_response_headers: Literal[True] = Field(default=True, title="Include Response Headers") diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py new file mode 100644 index 00000000..46a439ed --- /dev/null +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -0,0 +1,138 @@ +"""Reusable exponential backoff / retry decorator for sync and async functions.""" + +import asyncio +import inspect +import logging +import random +import time +from dataclasses import dataclass +from functools import wraps +from typing import Callable, Optional, ParamSpec, Tuple, Type, TypeVar + +from rag_core_lib.impl.utils.utils import headers_from_exception, status_code_from_exception, wait_from_rate_limit_headers + + +@dataclass +class RetrySettings: + max_retries: int = 5 # total retries (not counting the first attempt, should be > 0) + retry_base_delay: float = 0.5 # seconds + retry_max_delay: float = 600.0 # cap for any single wait + backoff_factor: float = 2.0 # exponential factor + attempt_cap: int = 6 # cap exponent growth (2**attempt_cap) + jitter_min: float = 0.05 # seconds + jitter_max: float = 0.25 # seconds + + +# Use ParamSpec and TypeVar for type-safe decorators +P = ParamSpec("P") +R = TypeVar("R") + + +def retry_with_backoff( + *, + settings: RetrySettings | None = None, + exceptions: Tuple[Type[BaseException], ...] = (Exception,), + rate_limit_exceptions: Tuple[Type[BaseException], ...] = (), + rate_limit_statuses: Tuple[int, ...] = (429,), + rate_limit_header_names: Tuple[str, ...] = ("x-ratelimit-reset-requests", "x-ratelimit-reset-tokens"), + is_rate_limited: Optional[Callable[[BaseException], bool]] = None, + logger: Optional[logging.Logger] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Apply robust retry logic with exponential backoff and optional rate-limit awareness. + """ + cfg = settings or RetrySettings() + + def _should_treat_as_rate_limited(exc: BaseException) -> bool: + if is_rate_limited and is_rate_limited(exc): + return True + if isinstance(exc, rate_limit_exceptions): + return True + status_code = status_code_from_exception(exc) + if status_code in rate_limit_statuses: + return True + msg = str(exc).lower() + return ("rate limit" in msg) or ("ratelimit" in msg) + + def _compute_backoff_wait(attempt: int) -> float: + delay = cfg.retry_base_delay * (cfg.backoff_factor ** min(attempt, cfg.attempt_cap)) + return min(delay, cfg.retry_max_delay) + + def _with_jitter(seconds: float) -> float: + return min(seconds + random.uniform(cfg.jitter_min, cfg.jitter_max), cfg.retry_max_delay) + + def _calculate_wait_time(attempt: int, exc: BaseException) -> float | None: + """ + Centralizes the logic for calculating wait time. + Returns None if the exception should be re-raised immediately. + """ + total_attempts = cfg.max_retries + 1 + if attempt == cfg.max_retries: + if logger: + logger.error("Failed after %d attempts: %s", total_attempts, exc, exc_info=False) + return None # Signal to re-raise + + if _should_treat_as_rate_limited(exc): + headers = headers_from_exception(exc) + wait = wait_from_rate_limit_headers(headers, rate_limit_header_names) + if wait is None: + wait = _compute_backoff_wait(attempt) + + final_wait = _with_jitter(wait) + if logger: + logger.warning( + "Rate limited. Remaining: req=%s tok=%s. Reset in: req=%s tok=%s. Retrying in %.2fs (attempt %d/%d)...", + headers.get("x-ratelimit-remaining-requests", "?"), + headers.get("x-ratelimit-remaining-tokens", "?"), + headers.get("x-ratelimit-reset-requests", "?"), + headers.get("x-ratelimit-reset-tokens", "?"), + final_wait, + attempt + 1, + total_attempts, + ) + logger.warning("headers: %s", headers) + return final_wait + else: + delay = _compute_backoff_wait(attempt) + if logger: + logger.warning( + "Attempt %d/%d failed: %s. Retrying in %.2fs...", + attempt + 1, + total_attempts, + exc, + delay, + exc_info=False, + ) + return delay + + def decorator(fn: Callable[P, R]) -> Callable[P, R]: + if inspect.iscoroutinefunction(fn): + @wraps(fn) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + for attempt in range(cfg.max_retries + 1): + try: + return await fn(*args, **kwargs) + except exceptions as exc: # type: ignore[misc] + wait_time = _calculate_wait_time(attempt, exc) + if wait_time is None: + raise + await asyncio.sleep(wait_time) + # This line should be unreachable if max_retries >= 0 + raise AssertionError("Retry loop exited unexpectedly.") + return async_wrapper + else: + @wraps(fn) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + for attempt in range(cfg.max_retries + 1): + try: + return fn(*args, **kwargs) + except exceptions as exc: # type: ignore[misc] + wait_time = _calculate_wait_time(attempt, exc) + if wait_time is None: + raise + time.sleep(wait_time) + # This line should be unreachable if max_retries >= 0 + raise AssertionError("Retry loop exited unexpectedly.") + return sync_wrapper + + return decorator diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py new file mode 100644 index 00000000..f39f917b --- /dev/null +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py @@ -0,0 +1,71 @@ +import re +from typing import Any, Iterable, Optional + + +def _to_seconds(v): + if v is None: + return None + try: + s = str(v).strip().lower() + # Support composite durations like "1h21m55s", as well as single-unit values + if any(u in s for u in ("h", "m", "s")): + total = 0.0 + for val, unit in re.findall(r"([0-9]+(?:\.[0-9]+)?)([hms])", s): + num = float(val) + if unit == "h": + total += num * 3600 + elif unit == "m": + total += num * 60 + else: # "s" + total += num + return total + # Fallback: plain number interpreted as seconds + return float(s) + except Exception: + return None + + +def _normalize_headers(raw_headers: Any) -> dict[str, str]: + """Return a lowercased dict[str, str] from httpx.Headers or mapping-like objects.""" + if not raw_headers: + return {} + try: + if hasattr(raw_headers, "items"): + items = list(raw_headers.items()) # works for dict-like and httpx.Headers + else: + items = list(dict(raw_headers).items()) + except Exception: + try: + items = list(dict(raw_headers).items()) + except Exception: + items = [] + out: dict[str, str] = {} + for k, v in items: + try: + out[str(k).lower()] = str(v) + except Exception: + continue + return out + + +def status_code_from_exception(exc: BaseException) -> Optional[int]: + resp = getattr(exc, "response", None) + return getattr(resp, "status_code", None) + + +def headers_from_exception(exc: BaseException) -> dict[str, str]: + resp = getattr(exc, "response", None) + raw = getattr(resp, "headers", None) + return _normalize_headers(raw) + + +def wait_from_rate_limit_headers( + headers: dict[str, str], + header_names: Iterable[str] = ("x-ratelimit-reset-requests", "x-ratelimit-reset-tokens"), +) -> Optional[float]: + candidates = [] + for name in header_names: + sec = _to_seconds(headers.get(name)) + if sec is not None: + candidates.append(sec) + return max(candidates) if candidates else None diff --git a/services/frontend/apps/admin-app/Dockerfile b/services/frontend/apps/admin-app/Dockerfile index d00f9e25..ce069b34 100644 --- a/services/frontend/apps/admin-app/Dockerfile +++ b/services/frontend/apps/admin-app/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /usr/src/app COPY ./services/frontend/package*.json ./ -RUN npm install +RUN npm install --verbose COPY ./services/frontend . diff --git a/services/frontend/apps/chat-app/Dockerfile b/services/frontend/apps/chat-app/Dockerfile index 7fd60b75..7074c8a8 100644 --- a/services/frontend/apps/chat-app/Dockerfile +++ b/services/frontend/apps/chat-app/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /usr/src/app COPY ./services/frontend/package*.json ./ -RUN npm install +RUN npm install --verbose COPY ./services/frontend .