Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions apps/sim/app/api/mcp/serve/[serverId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { type AuthResult, checkHybridAuth } from '@/lib/auth/hybrid'
import { generateInternalToken } from '@/lib/auth/internal'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { SIM_VIA_HEADER } from '@/lib/execution/call-chain'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'

const logger = createLogger('WorkflowMcpServeAPI')
Expand Down Expand Up @@ -181,7 +182,8 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
serverId,
rpcParams as { name: string; arguments?: Record<string, unknown> },
executeAuthContext,
server.isPublic ? server.createdBy : undefined
server.isPublic ? server.createdBy : undefined,
request.headers.get(SIM_VIA_HEADER)
)

default:
Expand Down Expand Up @@ -244,7 +246,8 @@ async function handleToolsCall(
serverId: string,
params: { name: string; arguments?: Record<string, unknown> } | undefined,
executeAuthContext?: ExecuteAuthContext | null,
publicServerOwnerId?: string
publicServerOwnerId?: string,
simViaHeader?: string | null
): Promise<NextResponse> {
try {
if (!params?.name) {
Expand Down Expand Up @@ -300,6 +303,10 @@ async function handleToolsCall(
}
}

if (simViaHeader) {
headers[SIM_VIA_HEADER] = simViaHeader
}

logger.info(`Executing workflow ${tool.workflowId} via MCP tool ${params.name}`)

const response = await fetch(executeUrl, {
Expand Down
21 changes: 20 additions & 1 deletion apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import {
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import {
buildNextCallChain,
parseCallChain,
SIM_VIA_HEADER,
validateCallChain,
} from '@/lib/execution/call-chain'
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
Expand Down Expand Up @@ -155,17 +161,19 @@ type AsyncExecutionParams = {
input: any
triggerType: CoreTriggerType
executionId: string
callChain?: string[]
}

async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
const { requestId, workflowId, userId, input, triggerType, executionId } = params
const { requestId, workflowId, userId, input, triggerType, executionId, callChain } = params

const payload: WorkflowExecutionPayload = {
workflowId,
userId,
input,
triggerType,
executionId,
callChain,
}

try {
Expand Down Expand Up @@ -236,6 +244,14 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const requestId = generateRequestId()
const { id: workflowId } = await params

const incomingCallChain = parseCallChain(req.headers.get(SIM_VIA_HEADER))
const callChainError = validateCallChain(incomingCallChain)
if (callChainError) {
logger.warn(`[${requestId}] Call chain rejected for workflow ${workflowId}: ${callChainError}`)
return NextResponse.json({ error: callChainError }, { status: 409 })
}
const callChain = buildNextCallChain(incomingCallChain, workflowId)

try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
Expand Down Expand Up @@ -433,6 +449,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
input,
triggerType: loggingTriggerType,
executionId,
callChain,
})
}

Expand Down Expand Up @@ -539,6 +556,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
isClientSession,
enforceCredentialAccess: useAuthenticatedUserAsActor,
workflowStateOverride: effectiveWorkflowStateOverride,
callChain,
}

const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
Expand Down Expand Up @@ -909,6 +927,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
isClientSession,
enforceCredentialAccess: useAuthenticatedUserAsActor,
workflowStateOverride: effectiveWorkflowStateOverride,
callChain,
}

const sseExecutionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/background/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export type WorkflowExecutionPayload = {
triggerType?: CoreTriggerType
executionId?: string
metadata?: Record<string, any>
callChain?: string[]
}

/**
Expand Down Expand Up @@ -95,6 +96,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
useDraftState: false,
startTime: new Date().toISOString(),
isClientSession: false,
callChain: payload.callChain,
}

const snapshot = new ExecutionSnapshot(
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ export class DAGExecutor {
base64MaxBytes: this.contextExtensions.base64MaxBytes,
runFromBlockContext: overrides?.runFromBlockContext,
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
callChain: this.contextExtensions.callChain,
}

if (this.contextExtensions.resumeFromSnapshot) {
Expand Down
7 changes: 7 additions & 0 deletions apps/sim/executor/execution/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface ExecutionMetadata {
parallels?: Record<string, any>
deploymentVersionId?: string
}
callChain?: string[]
}

export interface SerializableExecutionState {
Expand Down Expand Up @@ -167,6 +168,12 @@ export interface ContextExtensions {
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string

/**
* Ordered list of workflow IDs in the current call chain, used for cycle detection.
* Each hop appends the current workflow ID before making outgoing requests.
*/
callChain?: string[]
}

export interface WorkflowInput {
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/api/api-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export class ApiBlockHandler implements BlockHandler {
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
enforceCredentialAccess: ctx.enforceCredentialAccess,
callChain: ctx.callChain,
},
},
false,
Expand Down
11 changes: 11 additions & 0 deletions apps/sim/executor/handlers/workflow/workflow-handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { buildNextCallChain, validateCallChain } from '@/lib/execution/call-chain'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import type { TraceSpan } from '@/lib/logs/types'
Expand Down Expand Up @@ -167,6 +168,15 @@ export class WorkflowBlockHandler implements BlockHandler {
ctx.onChildWorkflowInstanceReady?.(effectiveBlockId, instanceId, iterationContext)
}

const childCallChain = buildNextCallChain(ctx.callChain || [], workflowId)
const depthError = validateCallChain(childCallChain)
if (depthError) {
throw new ChildWorkflowError({
message: depthError,
childWorkflowName,
})
}

const subExecutor = new Executor({
workflow: childWorkflow.serializedState,
workflowInput: childWorkflowInput,
Expand All @@ -180,6 +190,7 @@ export class WorkflowBlockHandler implements BlockHandler {
userId: ctx.userId,
executionId: ctx.executionId,
abortSignal: ctx.abortSignal,
callChain: childCallChain,
...(shouldPropagateCallbacks && {
onBlockStart: ctx.onBlockStart,
onBlockComplete: ctx.onBlockComplete,
Expand Down
6 changes: 6 additions & 0 deletions apps/sim/executor/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ export interface ExecutionContext {
*/
stopAfterBlockId?: string

/**
* Ordered list of workflow IDs in the current call chain, used for cycle detection.
* Passed to outgoing HTTP requests via the X-Sim-Via header.
*/
callChain?: string[]

/**
* Counter for generating monotonically increasing execution order values.
* Starts at 0 and increments for each block. Use getNextExecutionOrder() to access.
Expand Down
130 changes: 130 additions & 0 deletions apps/sim/lib/execution/__tests__/call-chain.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import {
buildNextCallChain,
MAX_CALL_CHAIN_DEPTH,
parseCallChain,
SIM_VIA_HEADER,
serializeCallChain,
validateCallChain,
} from '@/lib/execution/call-chain'

describe('call-chain', () => {
describe('SIM_VIA_HEADER', () => {
it('has the expected header name', () => {
expect(SIM_VIA_HEADER).toBe('X-Sim-Via')
})
})

describe('MAX_CALL_CHAIN_DEPTH', () => {
it('equals 10', () => {
expect(MAX_CALL_CHAIN_DEPTH).toBe(10)
})
})

describe('parseCallChain', () => {
it('returns empty array for null', () => {
expect(parseCallChain(null)).toEqual([])
})

it('returns empty array for undefined', () => {
expect(parseCallChain(undefined)).toEqual([])
})

it('returns empty array for empty string', () => {
expect(parseCallChain('')).toEqual([])
})

it('returns empty array for whitespace-only string', () => {
expect(parseCallChain(' ')).toEqual([])
})

it('parses a single workflow ID', () => {
expect(parseCallChain('wf-abc')).toEqual(['wf-abc'])
})

it('parses multiple comma-separated workflow IDs', () => {
expect(parseCallChain('wf-a,wf-b,wf-c')).toEqual(['wf-a', 'wf-b', 'wf-c'])
})

it('trims whitespace around workflow IDs', () => {
expect(parseCallChain(' wf-a , wf-b , wf-c ')).toEqual(['wf-a', 'wf-b', 'wf-c'])
})

it('filters out empty segments', () => {
expect(parseCallChain('wf-a,,wf-b')).toEqual(['wf-a', 'wf-b'])
})
})

describe('serializeCallChain', () => {
it('serializes an empty array', () => {
expect(serializeCallChain([])).toBe('')
})

it('serializes a single ID', () => {
expect(serializeCallChain(['wf-a'])).toBe('wf-a')
})

it('serializes multiple IDs with commas', () => {
expect(serializeCallChain(['wf-a', 'wf-b', 'wf-c'])).toBe('wf-a,wf-b,wf-c')
})
})

describe('validateCallChain', () => {
it('returns null for an empty chain', () => {
expect(validateCallChain([])).toBeNull()
})

it('returns null when chain is under max depth', () => {
expect(validateCallChain(['wf-a', 'wf-b'])).toBeNull()
})

it('allows legitimate self-recursion', () => {
expect(validateCallChain(['wf-a', 'wf-a', 'wf-a'])).toBeNull()
})

it('returns depth error when chain is at max depth', () => {
const chain = Array.from({ length: MAX_CALL_CHAIN_DEPTH }, (_, i) => `wf-${i}`)
const error = validateCallChain(chain)
expect(error).toContain(
`Maximum workflow call chain depth (${MAX_CALL_CHAIN_DEPTH}) exceeded`
)
})

it('allows chain just under max depth', () => {
const chain = Array.from({ length: MAX_CALL_CHAIN_DEPTH - 1 }, (_, i) => `wf-${i}`)
expect(validateCallChain(chain)).toBeNull()
})
})

describe('buildNextCallChain', () => {
it('appends workflow ID to empty chain', () => {
expect(buildNextCallChain([], 'wf-a')).toEqual(['wf-a'])
})

it('appends workflow ID to existing chain', () => {
expect(buildNextCallChain(['wf-a', 'wf-b'], 'wf-c')).toEqual(['wf-a', 'wf-b', 'wf-c'])
})

it('does not mutate the original chain', () => {
const original = ['wf-a']
const result = buildNextCallChain(original, 'wf-b')
expect(original).toEqual(['wf-a'])
expect(result).toEqual(['wf-a', 'wf-b'])
})
})

describe('round-trip', () => {
it('parse → serialize is identity', () => {
const header = 'wf-a,wf-b,wf-c'
expect(serializeCallChain(parseCallChain(header))).toBe(header)
})

it('serialize → parse is identity', () => {
const chain = ['wf-a', 'wf-b', 'wf-c']
expect(parseCallChain(serializeCallChain(chain))).toEqual(chain)
})
})
})
51 changes: 51 additions & 0 deletions apps/sim/lib/execution/call-chain.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Workflow call chain detection using the Via-style pattern.
*
* Prevents infinite execution loops when workflows call each other via API or
* MCP endpoints. Each hop appends the current workflow ID to the `X-Sim-Via`
* header; on ingress the chain is checked for depth.
*/

export const SIM_VIA_HEADER = 'X-Sim-Via'
export const MAX_CALL_CHAIN_DEPTH = 10

/**
* Parses the `X-Sim-Via` header value into an ordered list of workflow IDs.
* Returns an empty array when the header is absent or empty.
*/
export function parseCallChain(headerValue: string | null | undefined): string[] {
if (!headerValue || !headerValue.trim()) {
return []
}
return headerValue
.split(',')
.map((id) => id.trim())
.filter(Boolean)
}

/**
* Serializes a call chain array back into the header value format.
*/
export function serializeCallChain(chain: string[]): string {
return chain.join(',')
}

/**
* Validates that the call chain has not exceeded the maximum depth.
* Returns an error message string if invalid, or `null` if the chain is
* safe to extend.
*/
export function validateCallChain(chain: string[]): string | null {
if (chain.length >= MAX_CALL_CHAIN_DEPTH) {
return `Maximum workflow call chain depth (${MAX_CALL_CHAIN_DEPTH}) exceeded.`
}

return null
}

/**
* Builds the next call chain by appending the current workflow ID.
*/
export function buildNextCallChain(chain: string[], workflowId: string): string[] {
return [...chain, workflowId]
}
1 change: 1 addition & 0 deletions apps/sim/lib/workflows/executor/execution-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ export async function executeWorkflowCore(
base64MaxBytes,
stopAfterBlockId: resolvedStopAfterBlockId,
onChildWorkflowInstanceReady,
callChain: metadata.callChain,
}

const executorInstance = new Executor({
Expand Down
Loading