Skip to content

Commit 02b9172

Browse files
authored
LLM Chain: Added foundation for chain execution (#616)
1 parent ab46e3d commit 02b9172

20 files changed

Lines changed: 2471 additions & 258 deletions

File tree

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
"""Create llm_chain table
2+
3+
Revision ID: 048
4+
Revises: 047
5+
Create Date: 2026-02-20 00:00:00.000000
6+
7+
"""
8+
9+
from alembic import op
10+
import sqlalchemy as sa
11+
from sqlalchemy.dialects.postgresql import JSONB
12+
13+
revision = "048"
14+
down_revision = "047"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade() -> None:
20+
# 1. Create llm_chain table
21+
op.create_table(
22+
"llm_chain",
23+
sa.Column(
24+
"id",
25+
sa.Uuid(),
26+
nullable=False,
27+
comment="Unique identifier for the LLM chain record",
28+
),
29+
sa.Column(
30+
"job_id",
31+
sa.Uuid(),
32+
nullable=False,
33+
comment="Reference to the parent job (status tracked in job table)",
34+
),
35+
sa.Column(
36+
"project_id",
37+
sa.Integer(),
38+
nullable=False,
39+
comment="Reference to the project this LLM call belongs to",
40+
),
41+
sa.Column(
42+
"organization_id",
43+
sa.Integer(),
44+
nullable=False,
45+
comment="Reference to the organization this LLM call belongs to",
46+
),
47+
sa.Column(
48+
"status",
49+
sa.String(),
50+
nullable=False,
51+
server_default="pending",
52+
comment="Chain execution status (pending, running, failed, completed)",
53+
),
54+
sa.Column(
55+
"error",
56+
sa.Text(),
57+
nullable=True,
58+
comment="Error message if the chain execution failed",
59+
),
60+
sa.Column(
61+
"block_sequences",
62+
JSONB(),
63+
nullable=True,
64+
comment="Ordered list of llm_call UUIDs as blocks complete",
65+
),
66+
sa.Column(
67+
"total_blocks",
68+
sa.Integer(),
69+
nullable=False,
70+
comment="Total number of blocks to execute",
71+
),
72+
sa.Column(
73+
"number_of_blocks_processed",
74+
sa.Integer(),
75+
nullable=False,
76+
server_default="0",
77+
comment="Number of blocks processed so far (used for tracking progress)",
78+
),
79+
sa.Column(
80+
"input",
81+
sa.String(),
82+
nullable=False,
83+
comment="First block user's input - text string, binary data, or file path for multimodal",
84+
),
85+
sa.Column(
86+
"output",
87+
JSONB(),
88+
nullable=True,
89+
comment="Last block's final output (set on chain completion)",
90+
),
91+
sa.Column(
92+
"configs",
93+
JSONB(),
94+
nullable=True,
95+
comment="Ordered list of block configs as submitted in the request",
96+
),
97+
sa.Column(
98+
"total_usage",
99+
JSONB(),
100+
nullable=True,
101+
comment="Aggregated token usage: {input_tokens, output_tokens, total_tokens}",
102+
),
103+
sa.Column(
104+
"metadata",
105+
JSONB(),
106+
nullable=True,
107+
comment="Future-proof extensibility catch-all",
108+
),
109+
sa.Column(
110+
"inserted_at",
111+
sa.DateTime(),
112+
nullable=False,
113+
comment="Timestamp when the chain record was created",
114+
),
115+
sa.Column(
116+
"updated_at",
117+
sa.DateTime(),
118+
nullable=False,
119+
comment="Timestamp when the chain record was last updated",
120+
),
121+
sa.PrimaryKeyConstraint("id"),
122+
sa.ForeignKeyConstraint(["job_id"], ["job.id"], ondelete="CASCADE"),
123+
sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"),
124+
sa.ForeignKeyConstraint(
125+
["organization_id"], ["organization.id"], ondelete="CASCADE"
126+
),
127+
)
128+
129+
op.create_index(
130+
"idx_llm_chain_job_id",
131+
"llm_chain",
132+
["job_id"],
133+
)
134+
135+
# 2. Add chain_id FK column to llm_call table
136+
op.add_column(
137+
"llm_call",
138+
sa.Column(
139+
"chain_id",
140+
sa.Uuid(),
141+
nullable=True,
142+
comment="Reference to the parent chain (NULL for standalone /llm/call requests)",
143+
),
144+
)
145+
op.create_foreign_key(
146+
"fk_llm_call_chain_id",
147+
"llm_call",
148+
"llm_chain",
149+
["chain_id"],
150+
["id"],
151+
ondelete="SET NULL",
152+
)
153+
op.create_index(
154+
"idx_llm_call_chain_id",
155+
"llm_call",
156+
["chain_id"],
157+
postgresql_where=sa.text("chain_id IS NOT NULL"),
158+
)
159+
160+
op.execute("ALTER TYPE jobtype ADD VALUE IF NOT EXISTS 'LLM_CHAIN'")
161+
162+
163+
def downgrade() -> None:
164+
op.drop_index("idx_llm_call_chain_id", table_name="llm_call")
165+
op.drop_constraint("fk_llm_call_chain_id", "llm_call", type_="foreignkey")
166+
op.drop_column("llm_call", "chain_id")
167+
168+
op.drop_index("idx_llm_chain_job_id", table_name="llm_chain")
169+
op.drop_table("llm_chain")
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
Execute a chain of LLM calls sequentially, where each block's output becomes the next block's input.
2+
3+
This endpoint initiates an asynchronous LLM chain job. The request is queued
4+
for processing, and results are delivered via the callback URL when complete.
5+
6+
### Key Parameters
7+
8+
**`query`** (required) - Initial query input for the first block in the chain:
9+
- `input` (required): User question/prompt/query — accepts a plain string, a structured input object (`text`, `audio`, `image`, `pdf`), or a list of structured inputs
10+
- `conversation` (optional, object): Conversation configuration
11+
- `id` (optional, string): Existing conversation ID to continue
12+
- `auto_create` (optional, boolean, default false): Create new conversation if no ID provided
13+
- **Note**: Cannot specify both `id` and `auto_create=true`
14+
15+
16+
**`blocks`** (required, array, min 1 block) - Ordered list of blocks to execute sequentially. Each block contains:
17+
18+
- `config` (required) - Configuration for this block's LLM call (just choose one mode):
19+
20+
- **Mode 1: Stored Configuration**
21+
- `id` (UUID): Configuration ID
22+
- `version` (integer >= 1): Version number
23+
- **Both required together**
24+
- **Note**: When using stored configuration, do not include the `blob` field in the request body
25+
26+
- **Mode 2: Ad-hoc Configuration**
27+
- `blob` (object): Complete configuration object
28+
- `completion` (required, object): Completion configuration
29+
- `provider` (required, string): Kaapi providers (`openai`, `google`, `sarvamai`) — params are validated and mapped internally. Native providers (`openai-native`, `google-native`, `sarvamai-native`) — params are passed through as-is
30+
- `type` (required, string): Completion type — `"text"`, `"stt"`, or `"tts"`
31+
- `params` (required, object): Parameters structure depends on provider and type (see schema for detailed structure)
32+
- `input_guardrails` (optional, array): Guardrails applied to validate/sanitize input before the LLM call
33+
- `output_guardrails` (optional, array): Guardrails applied to validate/sanitize output after the LLM call
34+
- `prompt_template` (optional, object): Template for text interpolation
35+
- `template` (required, string): Template string with `{{input}}` placeholder — replaced with the block's input before execution
36+
- **Note**
37+
- When using ad-hoc configuration, do not include `id` and `version` fields
38+
- When using the Kaapi abstraction, parameters that are not supported by the selected provider or model are automatically suppressed. If any parameters are ignored, a list of warnings is included in the metadata.warnings.
39+
- **Recommendation**: Use stored configs (Mode 1) for production; use ad-hoc configs only for testing/validation
40+
- **Schema**: Check the API schema or examples below for the complete parameter structure for each provider type
41+
42+
- `include_provider_raw_response` (optional, boolean, default false):
43+
- When true, includes the unmodified raw response from the LLM provider for this block
44+
45+
- `intermediate_callback` (optional, boolean, default false):
46+
- When true, sends an intermediate callback after this block completes with the block's response, usage, and position in the chain
47+
48+
**`callback_url`** (optional, HTTPS URL):
49+
- Webhook endpoint to receive the final response and intermediate callbacks
50+
- Must be a valid HTTPS URL
51+
- If not provided, response is only accessible through job status
52+
53+
**`request_metadata`** (optional, object):
54+
- Custom JSON metadata
55+
- Passed through unchanged in the response
56+
57+
### Note
58+
- If any block fails, the chain stops immediately — no subsequent blocks are executed
59+
- `warnings` list is automatically added in response metadata when using Kaapi configs if any parameters are suppressed or adjusted (e.g., temperature on reasoning models)
60+
61+
---

backend/app/api/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
login,
1111
languages,
1212
llm,
13+
llm_chain,
1314
organization,
1415
openai_conversation,
1516
project,
@@ -41,6 +42,7 @@
4142
api_router.include_router(evaluations.router)
4243
api_router.include_router(languages.router)
4344
api_router.include_router(llm.router)
45+
api_router.include_router(llm_chain.router)
4446
api_router.include_router(login.router)
4547
api_router.include_router(onboarding.router)
4648
api_router.include_router(openai_conversation.router)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import logging
2+
3+
from fastapi import APIRouter, Depends
4+
from app.api.deps import AuthContextDep, SessionDep
5+
from app.api.permissions import Permission, require_permission
6+
from app.models import LLMChainRequest, LLMChainResponse, Message
7+
from app.services.llm.jobs import start_chain_job
8+
from app.utils import APIResponse, validate_callback_url, load_description
9+
10+
logger = logging.getLogger(__name__)
11+
12+
router = APIRouter(tags=["LLM"])
13+
llm_callback_router = APIRouter()
14+
15+
16+
@llm_callback_router.post(
17+
"{$callback_url}",
18+
name="llm_chain_callback",
19+
)
20+
def llm_callback_notification(body: APIResponse[LLMChainResponse]):
21+
"""
22+
Callback endpoint specification for LLM chain completion.
23+
24+
The callback will receive:
25+
- On success: APIResponse with success=True and data containing LLMChainResponse
26+
- On failure: APIResponse with success=False and error message
27+
- metadata field will always be included if provided in the request
28+
"""
29+
...
30+
31+
32+
@router.post(
33+
"/llm/chain",
34+
description=load_description("llm/llm_chain.md"),
35+
response_model=APIResponse[Message],
36+
callbacks=llm_callback_router.routes,
37+
dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))],
38+
)
39+
def llm_chain(
40+
_current_user: AuthContextDep, _session: SessionDep, request: LLMChainRequest
41+
):
42+
"""
43+
Endpoint to initiate an LLM chain as a background job.
44+
"""
45+
project_id = _current_user.project_.id
46+
organization_id = _current_user.organization_.id
47+
48+
if request.callback_url:
49+
validate_callback_url(str(request.callback_url))
50+
51+
start_chain_job(
52+
db=_session,
53+
request=request,
54+
project_id=project_id,
55+
organization_id=organization_id,
56+
)
57+
58+
return APIResponse.success_response(
59+
data=Message(
60+
message="Your response is being generated and will be delivered via callback."
61+
),
62+
)

backend/app/crud/llm.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def create_llm_call(
4848
*,
4949
request: LLMCallRequest,
5050
job_id: UUID,
51+
chain_id: UUID | None = None,
5152
project_id: int,
5253
organization_id: int,
5354
resolved_config: ConfigBlob,
@@ -128,6 +129,7 @@ def create_llm_call(
128129
job_id=job_id,
129130
project_id=project_id,
130131
organization_id=organization_id,
132+
chain_id=chain_id,
131133
input=serialize_input(request.query.input),
132134
input_type=input_type,
133135
output_type=output_type,

0 commit comments

Comments
 (0)