Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions infrastructure/rag/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 15.0.7
- name: keydb
repository: https://enapter.github.io/charts/
version: 0.48.0
- name: ollama
repository: https://otwld.github.io/ollama-helm/
version: 1.24.0
digest: sha256:30cc964388864bd738eb0668367576dbad45eb9d6a885cdc48ef7c07c9692aa0
generated: "2025-08-04T09:23:39.885193+02:00"
digest: sha256:2d297b86ec5ea607319c691551dd99b658c03c5ba1ca9884b9b1ede4c0c445a8
generated: "2025-08-15T15:23:26.079292+02:00"
5 changes: 0 additions & 5 deletions infrastructure/rag/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ dependencies:
repository: https://charts.bitnami.com/bitnami
version: "15.0.7"
condition: features.minio.enabled
- name: keydb
alias: keydb
repository: https://enapter.github.io/charts/
version: "0.48.0"
condition: features.keydb.enabled
- name: ollama
alias: ollama
version: 1.24.0
Expand Down
32 changes: 15 additions & 17 deletions infrastructure/rag/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ features:
enabled: true
frontend:
enabled: true
keydb:
enabled: true
mcp:
enabled: true

Expand Down Expand Up @@ -70,15 +68,15 @@ backend:
image:
repository: ghcr.io/stackitcloud/rag-template/mcp-server
pullPolicy: Always
tag: "latest"
tag: "v2.1.0"

name: backend
replicaCount: 1

image:
repository: ghcr.io/stackitcloud/rag-template/rag-backend
pullPolicy: Always
tag: "latest"
tag: "v2.1.0"

command:
- "poetry"
Expand Down Expand Up @@ -192,6 +190,9 @@ backend:
stackitEmbedder:
STACKIT_EMBEDDER_MODEL: "intfloat/e5-mistral-7b-instruct"
STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1
STACKIT_EMBEDDER_MAX_RETRIES: 10
STACKIT_EMBEDDER_RETRY_BASE_DELAY: 1.0
STACKIT_EMBEDDER_RETRY_MAX_DELAY: 600.0
ollama:
OLLAMA_MODEL: "llama3.2:3b-instruct-fp16"
OLLAMA_BASE_URL: "http://rag-ollama:11434"
Expand All @@ -216,7 +217,7 @@ frontend:
image:
repository: ghcr.io/stackitcloud/rag-template/frontend
pullPolicy: Always
tag: "latest"
tag: "v2.1.0"

service:
type: ClusterIP
Expand Down Expand Up @@ -251,7 +252,7 @@ adminBackend:
image:
repository: ghcr.io/stackitcloud/rag-template/admin-backend
pullPolicy: Always
tag: "latest"
tag: "v2.1.0"

command:
- "poetry"
Expand Down Expand Up @@ -321,7 +322,9 @@ adminBackend:
CHUNKER_OVERLAP: 300
keyValueStore:
USECASE_KEYVALUE_PORT: 6379
USECASE_KEYVALUE_HOST: "rag-keydb"
USECASE_KEYVALUE_HOST: "rag-redis-primary"
USECASE_KEYVALUE_USERNAME: "default"
USECASE_KEYVALUE_PASSWORD: "MOqTsoa22R"
Comment on lines +325 to +327
Copy link

Copilot AI Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded password in configuration file poses a security risk. Consider using Kubernetes secrets or environment variable references instead of plain text passwords in the values file.

Suggested change
USECASE_KEYVALUE_HOST: "rag-redis-primary"
USECASE_KEYVALUE_USERNAME: "default"
USECASE_KEYVALUE_PASSWORD: "MOqTsoa22R"
USECASE_KEYVALUE_PASSWORD: "${USECASE_KEYVALUE_PASSWORD}"

Copilot uses AI. Check for mistakes.
sourceUploader:
SOURCE_UPLOADER_TIMEOUT: 3600

Expand All @@ -331,7 +334,7 @@ extractor:
image:
repository: ghcr.io/stackitcloud/rag-template/document-extractor
pullPolicy: Always
tag: "latest"
tag: "v2.1.0"

command:
- "poetry"
Expand Down Expand Up @@ -379,7 +382,7 @@ adminFrontend:
image:
repository: ghcr.io/stackitcloud/rag-template/admin-frontend
pullPolicy: Always
tag: "latest"
tag: "v2.1.0"

service:
type: ClusterIP
Expand Down Expand Up @@ -526,12 +529,12 @@ langfuse:

# Redis Configuration (external KeyDB)
redis:
deploy: false
host: "rag-keydb"
deploy: true
host: "rag-redis-primary"
port: 6379
auth:
username: "default"
password: ""
password: "MOqTsoa22R"
Comment on lines 536 to +537
Copy link

Copilot AI Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded password in langfuse Redis configuration. This should be moved to a Kubernetes secret or use a secure configuration management approach.

Copilot uses AI. Check for mistakes.

# ClickHouse Configuration (external ClickHouse)
clickhouse:
Expand Down Expand Up @@ -617,8 +620,3 @@ ollama:
qdrant:
image:
tag: v1.14.1

keydb:
multiMaster: "no"
activeReplicas: "no"
nodes: 1
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, settings: KeyValueSettings):
settings : KeyValueSettings
The settings object containing the host and port information for the Redis connection.
"""
self._redis = Redis(host=settings.host, port=settings.port, decode_responses=True)
self._redis = Redis(host=settings.host, port=settings.port, username=settings.username, password=settings.password, decode_responses=True)

@staticmethod
def _to_str(file_name: str, file_status: Status) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ class Config:

host: str = Field()
port: int = Field()
username: str = Field()
password: str = Field()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import logging
import traceback
from typing import Optional

from langchain.text_splitter import RecursiveCharacterTextSplitter
Expand All @@ -16,6 +15,7 @@
)
from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff

logger = logging.getLogger(__name__)

Expand All @@ -28,8 +28,6 @@ class LangchainSummarizer(Summarizer):
document and retries the summarization process if an error occurs.
"""

RETRY_WAIT_TIME = 10

def __init__(
self,
langfuse_manager: LangfuseManager,
Expand Down Expand Up @@ -68,45 +66,42 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
"""
assert query, "Query is empty: %s" % query # noqa S101
config = ensure_config(config)
tries_remaining = config.get("configurable", {}).get("tries_remaining", 3)
logger.debug("Tries remaining %d" % tries_remaining)

if tries_remaining < 0:
raise Exception("Summary creation failed.")
document = Document(page_content=query)
langchain_documents = self._chunker.split_documents([document])
logger.debug("Summarizing %d chunk(s)...", len(langchain_documents))

outputs = []
for langchain_document in langchain_documents:
async with self._semaphore:
try:
result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config)
# Extract content from AIMessage if it's not already a string
content = result.content if hasattr(result, "content") else str(result)
outputs.append(content)
except Exception as e:
logger.error("Error in summarizing langchain doc: %s %s", e, traceback.format_exc())
config["tries_remaining"] = tries_remaining - 1
if "rate limit" in str(e).lower() or "ratelimit" in str(e).lower():
logger.warning(
"Rate limit encountered, waiting %d seconds before retry...", self.RETRY_WAIT_TIME
)
await asyncio.sleep(self.RETRY_WAIT_TIME)
result = await self.ainvoke(query, config)
# Extract content from AIMessage if it's not already a string
content = result.content if hasattr(result, "content") else str(result)
outputs.append(content)
# Fan out with concurrency, bounded by your semaphore inside _summarize_chunk
tasks = [
asyncio.create_task(self._summarize_chunk(doc.page_content, config))
for doc in langchain_documents
]
outputs = await asyncio.gather(*tasks)

if len(outputs) == 1:
return outputs[0]
summary = " ".join(outputs)

# Optional single reduce pass (no recursion)
merged = " ".join(outputs)
logger.debug(
"Reduced number of chars from %d to %d"
% (len("".join([x.page_content for x in langchain_documents])), len(summary))
"Reduced number of chars from %d to %d",
len("".join([x.page_content for x in langchain_documents])),
len(merged),
)
return await self.ainvoke(summary, config)
return await self._summarize_chunk(merged, config)

def _create_chain(self) -> Runnable:
return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm(
self.__class__.__name__
return (
self._langfuse_manager.get_base_prompt(self.__class__.__name__)
| self._langfuse_manager.get_base_llm(self.__class__.__name__)
)

@retry_with_backoff(logger=logger)
async def _summarize_chunk_impl(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput:
response = await self._create_chain().ainvoke({"text": text}, config)
return response.content if hasattr(response, "content") else str(response)

async def _summarize_chunk(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput:
# Hold the semaphore for the entire retry lifecycle
async with self._semaphore:
return await self._summarize_chunk_impl(text, config)
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
"""Module that contains the StackitEmbedder class."""

import logging

from langchain_core.embeddings import Embeddings
from openai import OpenAI
from openai import OpenAI, APIError, APITimeoutError, RateLimitError, APIConnectionError

from rag_core_api.embeddings.embedder import Embedder
from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings
from rag_core_lib.impl.utils.retry_decorator import RetrySettings, retry_with_backoff


logger = logging.getLogger(__name__)


class StackitEmbedder(Embedder, Embeddings):
"""A class that represents any Langchain provided Embedder."""

ATTEMPT_CAP = 10
BACKOFF_FACTOR = 2
JITTER_MIN_SECONDS = 0.05
JITTER_MAX_SECONDS = 0.25

def __init__(self, stackit_embedder_settings: StackitEmbedderSettings):
"""
Initialize the StackitEmbedder with the given settings.
Expand All @@ -36,24 +47,36 @@ def get_embedder(self) -> "StackitEmbedder":

def embed_documents(self, texts: list[str]) -> list[list[float]]:
"""
Embed a list of documents into numerical vectors.
Embed a batch of texts with exponential backoff retry logic. Batching is handled by the vector
database client.

Parameters
----------
texts : list of str
A list of documents to be embedded.
texts : list[str]
A batch of texts to be embedded.

Returns
-------
list[list[float]]
A list where each element is a list of floats representing the embedded vector of a document.
A list of embeddings for the batch.

Raises
------
Exception
If all retry attempts fail.
"""
responses = self._client.embeddings.create(
input=texts,
model=self._settings.model,
)
if not texts:
return []

@self._retry_wrapper()
def _call(batch: list[str]) -> list[list[float]]:
response = self._client.embeddings.with_raw_response.create(
input=batch,
model=self._settings.model,
)
return [data.embedding for data in response.parse().data]

return [data.embedding for data in responses.data]
return _call(texts)

def embed_query(self, text: str) -> list[float]:
"""
Expand All @@ -69,4 +92,26 @@ def embed_query(self, text: str) -> list[float]:
list[float]
The embedded representation of the query text.
"""
return self.embed_documents([text])[0]
embeddings_list = self.embed_documents([text])
if embeddings_list:
embeddings = embeddings_list[0]
return embeddings if embeddings else []
return embeddings_list

def _retry_wrapper(self):
"""Build a retry decorator *with runtime settings* from self._settings."""
return retry_with_backoff(
settings=RetrySettings(
max_retries=self._settings.max_retries,
retry_base_delay=self._settings.retry_base_delay,
retry_max_delay=self._settings.retry_max_delay,
backoff_factor=self.BACKOFF_FACTOR,
attempt_cap=self.ATTEMPT_CAP,
jitter_min=self.JITTER_MIN_SECONDS,
jitter_max=self.JITTER_MAX_SECONDS,
),
exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError),
rate_limit_exceptions=(RateLimitError,),
logger=logger,
)

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class StackitEmbedderSettings(BaseSettings):
(default "https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1").
api_key : str
The API key for authentication.
max_retries : int
Maximum number of retry attempts (default 10).
retry_base_delay : float
Base delay in seconds for exponential backoff (default 1.0).
retry_max_delay : float
Maximum delay in seconds between retries (default 60.0).
"""

class Config:
Expand All @@ -28,3 +34,6 @@ class Config:
model: str = Field(default="intfloat/e5-mistral-7b-instruct")
base_url: str = Field(default="https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1")
api_key: str = Field(default="")
max_retries: int = Field(default=10, description="Maximum number of retry attempts")
retry_base_delay: float = Field(default=1.0, description="Base delay in seconds for exponential backoff")
retry_max_delay: float = Field(default=600.0, description="Maximum delay in seconds between retries")
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module contains settings regarding the STACKIT vLLM."""

from typing import Literal
from pydantic import Field
from pydantic_settings import BaseSettings

Expand Down Expand Up @@ -34,3 +35,5 @@ class Config:

top_p: float = Field(default=0.1, title="LLM Top P")
temperature: float = Field(default=0, title="LLM Temperature")
# Always include response headers; disallow overriding via env by constraining to Literal[True]
include_response_headers: Literal[True] = Field(default=True, title="Include Response Headers")
Loading