Skip to content

Commit 78e7337

Browse files
Add execute_workflow_and_wait + workflow execution examples (#451)
* Add execute_workflow_and_wait feature * Add workflow execution examples * Fix SDK generation issue * Do not include deprecated task-queue param anymore
1 parent 6f3e24f commit 78e7337

7 files changed

Lines changed: 343 additions & 2 deletions

File tree

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/usr/bin/env python
2+
3+
import asyncio
4+
import os
5+
6+
from mistralai.client import Mistral
7+
8+
WORKFLOW_NAME = "example-hello-world-workflow"
9+
10+
11+
async def main():
12+
api_key = os.environ["MISTRAL_API_KEY"]
13+
14+
client = Mistral(api_key=api_key)
15+
16+
# Execute workflow and wait for result using wait_for_result parameter
17+
response = await client.workflows.execute_workflow_async(
18+
workflow_identifier=WORKFLOW_NAME,
19+
input={"document_title": "hello world"},
20+
wait_for_result=True,
21+
timeout_seconds=60.0,
22+
)
23+
24+
print(f"Workflow: {response.workflow_name}")
25+
print(f"Execution ID: {response.execution_id}")
26+
print(f"Result: {response.result}")
27+
28+
29+
if __name__ == "__main__":
30+
asyncio.run(main())
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#!/usr/bin/env python
2+
3+
import asyncio
4+
import os
5+
6+
from mistralai.client import Mistral
7+
8+
WORKFLOW_NAME = "example-hello-world-workflow"
9+
10+
11+
async def main():
12+
api_key = os.environ["MISTRAL_API_KEY"]
13+
14+
client = Mistral(api_key=api_key)
15+
16+
# Example 1: Using API sync mode (server-side waiting)
17+
result = await client.workflows.execute_workflow_and_wait_async(
18+
workflow_identifier=WORKFLOW_NAME,
19+
input={"document_title": "hello world"},
20+
use_api_sync=True,
21+
timeout_seconds=60.0,
22+
)
23+
print(f"Result (API sync): {result}")
24+
25+
# Example 2: Using polling mode (client-side waiting)
26+
result = await client.workflows.execute_workflow_and_wait_async(
27+
workflow_identifier=WORKFLOW_NAME,
28+
input={"document_title": "hello world"},
29+
use_api_sync=False,
30+
polling_interval=5,
31+
max_attempts=12, # 12 attempts * 5 seconds = 60 seconds max
32+
)
33+
print(f"Result (polling): {result}")
34+
35+
36+
if __name__ == "__main__":
37+
asyncio.run(main())
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python
2+
3+
import os
4+
5+
from mistralai.client import Mistral
6+
7+
WORKFLOW_NAME = "example-hello-world-workflow"
8+
9+
10+
def main():
11+
api_key = os.environ["MISTRAL_API_KEY"]
12+
13+
client = Mistral(api_key=api_key)
14+
15+
# Execute workflow and wait for result using wait_for_result parameter
16+
response = client.workflows.execute_workflow(
17+
workflow_identifier=WORKFLOW_NAME,
18+
input={"document_title": "hello world"},
19+
wait_for_result=True,
20+
timeout_seconds=60.0,
21+
)
22+
23+
print(f"Workflow: {response.workflow_name}")
24+
print(f"Execution ID: {response.execution_id}")
25+
print(f"Result: {response.result}")
26+
27+
28+
if __name__ == "__main__":
29+
main()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/usr/bin/env python
2+
3+
import os
4+
5+
from mistralai.client import Mistral
6+
7+
WORKFLOW_NAME = "example-hello-world-workflow"
8+
9+
10+
def main():
11+
api_key = os.environ["MISTRAL_API_KEY"]
12+
13+
client = Mistral(api_key=api_key)
14+
15+
# Example 1: Using API sync mode (server-side waiting)
16+
result = client.workflows.execute_workflow_and_wait(
17+
workflow_identifier=WORKFLOW_NAME,
18+
input={"document_title": "hello world"},
19+
use_api_sync=True,
20+
timeout_seconds=60.0,
21+
)
22+
print(f"Result (API sync): {result}")
23+
24+
# Example 2: Using polling mode (client-side waiting)
25+
result = client.workflows.execute_workflow_and_wait(
26+
workflow_identifier=WORKFLOW_NAME,
27+
input={"document_title": "hello world"},
28+
use_api_sync=False,
29+
polling_interval=5,
30+
max_attempts=12, # 12 attempts * 5 seconds = 60 seconds max
31+
)
32+
print(f"Result (polling): {result}")
33+
34+
35+
if __name__ == "__main__":
36+
main()

scripts/run_examples.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ exclude_files=(
4949
"examples/mistral/audio/async_realtime_transcription_microphone.py"
5050
"examples/mistral/audio/async_realtime_transcription_stream.py"
5151
"examples/mistral/audio/async_realtime_transcription_dual_delay_microphone.py"
52+
"examples/mistral/workflows/workflow_execute_and_wait.py"
53+
"examples/mistral/workflows/async_workflow_execute_and_wait.py"
54+
"examples/mistral/workflows/workflow_execute.py"
55+
"examples/mistral/workflows/async_workflow_execute.py"
5256
)
5357

5458
# Files that require extra dependencies (agents, mcp, audio, etc.)

src/mistralai/client/_version.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
import importlib.metadata
55

66
__title__: str = "mistralai"
7-
__version__: str = "2.2.2"
7+
__version__: str = "2.2.3"
88
__openapi_doc_version__: str = "1.0.0"
99
__gen_version__: str = "2.862.0"
10-
__user_agent__: str = "speakeasy-sdk/python 2.2.2 2.862.0 1.0.0 mistralai"
10+
__user_agent__: str = "speakeasy-sdk/python 2.2.3 2.862.0 1.0.0 mistralai"
1111

1212
try:
1313
if __package__ is not None:

src/mistralai/client/workflows.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
from typing import Any, Awaitable, Dict, List, Mapping, Optional, Union
2020
from typing_extensions import deprecated
2121

22+
# region imports
23+
import asyncio
24+
import time
25+
# endregion imports
26+
2227

2328
class Workflows(BaseSDK):
2429
executions: Executions
@@ -49,6 +54,206 @@ def _init_sdks(self):
4954
self.sdk_configuration, parent_ref=self.parent_ref
5055
)
5156

57+
# region sdk-class-body
58+
def execute_workflow_and_wait(
59+
self,
60+
workflow_identifier: str,
61+
input: OptionalNullable[Dict[str, Any]] = UNSET,
62+
execution_id: OptionalNullable[str] = UNSET,
63+
deployment_name: OptionalNullable[str] = UNSET,
64+
custom_tracing_attributes: OptionalNullable[Dict[str, str]] = UNSET,
65+
polling_interval: int = 5,
66+
max_attempts: Optional[int] = None,
67+
use_api_sync: bool = False,
68+
timeout_seconds: OptionalNullable[float] = UNSET,
69+
) -> Any:
70+
"""Execute a workflow and wait for its completion.
71+
72+
Args:
73+
workflow_identifier: The workflow name or ID.
74+
input: Input parameters for the workflow
75+
execution_id: Optional custom execution ID
76+
deployment_name: Name of the deployment to route this execution to
77+
custom_tracing_attributes: Custom tracing attributes
78+
polling_interval: Seconds between status checks when polling
79+
max_attempts: Maximum number of polling attempts when polling (None for unlimited)
80+
use_api_sync: Whether to use the API's built-in sync execution capability
81+
timeout_seconds: Maximum time to wait in seconds when using API sync
82+
83+
Returns:
84+
The workflow result directly
85+
86+
Raises:
87+
TimeoutError: If max_attempts is reached and workflow is still running
88+
RuntimeError: If workflow fails or terminates abnormally
89+
"""
90+
if use_api_sync:
91+
# Use the API's built-in synchronous execution
92+
response = self.execute_workflow(
93+
workflow_identifier=workflow_identifier,
94+
input=input,
95+
execution_id=execution_id,
96+
wait_for_result=True,
97+
timeout_seconds=timeout_seconds,
98+
custom_tracing_attributes=custom_tracing_attributes,
99+
deployment_name=deployment_name,
100+
)
101+
return response.result
102+
# Use polling method
103+
execution = self.execute_workflow(
104+
workflow_identifier=workflow_identifier,
105+
input=input,
106+
execution_id=execution_id,
107+
custom_tracing_attributes=custom_tracing_attributes,
108+
deployment_name=deployment_name,
109+
)
110+
111+
# Wait for completion
112+
final_execution = self._wait_for_workflow_completion(
113+
execution.execution_id, polling_interval, max_attempts
114+
)
115+
116+
return final_execution.result
117+
118+
def _wait_for_workflow_completion(
119+
self,
120+
execution_id: str,
121+
polling_interval: int = 5,
122+
max_attempts: Optional[int] = None,
123+
) -> models.WorkflowExecutionResponse:
124+
"""Wait for a workflow to complete by polling its status.
125+
126+
Args:
127+
execution_id: Execution ID of the workflow
128+
polling_interval: Seconds between status checks
129+
max_attempts: Maximum number of polling attempts (None for unlimited)
130+
131+
Returns:
132+
WorkflowExecutionResponse with the final execution details
133+
134+
Raises:
135+
TimeoutError: If max_attempts is reached and workflow is still running
136+
RuntimeError: If workflow fails or terminates abnormally
137+
"""
138+
attempts = 0
139+
while True:
140+
response = self.executions.get_workflow_execution(execution_id=execution_id)
141+
142+
if response.status != "RUNNING":
143+
if response.status == "COMPLETED":
144+
return response
145+
raise RuntimeError(f"Workflow failed with status: {response.status}")
146+
147+
attempts += 1
148+
if max_attempts is not None and attempts >= max_attempts:
149+
raise TimeoutError(
150+
f"Workflow is still running after {max_attempts} polling attempts"
151+
)
152+
153+
time.sleep(polling_interval)
154+
155+
async def execute_workflow_and_wait_async(
156+
self,
157+
workflow_identifier: str,
158+
input: OptionalNullable[Dict[str, Any]] = UNSET,
159+
execution_id: OptionalNullable[str] = UNSET,
160+
deployment_name: OptionalNullable[str] = UNSET,
161+
custom_tracing_attributes: OptionalNullable[Dict[str, str]] = UNSET,
162+
polling_interval: int = 5,
163+
max_attempts: Optional[int] = None,
164+
use_api_sync: bool = False,
165+
timeout_seconds: OptionalNullable[float] = UNSET,
166+
) -> Any:
167+
"""Execute a workflow and wait for its completion (async version).
168+
169+
Args:
170+
workflow_identifier: The workflow name or ID.
171+
input: Input parameters for the workflow
172+
execution_id: Optional custom execution ID
173+
deployment_name: Name of the deployment to route this execution to
174+
custom_tracing_attributes: Custom tracing attributes
175+
polling_interval: Seconds between status checks when polling
176+
max_attempts: Maximum number of polling attempts when polling (None for unlimited)
177+
use_api_sync: Whether to use the API's built-in sync execution capability
178+
timeout_seconds: Maximum time to wait in seconds when using API sync
179+
180+
Returns:
181+
The workflow result directly
182+
183+
Raises:
184+
TimeoutError: If max_attempts is reached and workflow is still running
185+
RuntimeError: If workflow fails or terminates abnormally
186+
"""
187+
if use_api_sync:
188+
# Use the API's built-in synchronous execution
189+
response = await self.execute_workflow_async(
190+
workflow_identifier=workflow_identifier,
191+
input=input,
192+
execution_id=execution_id,
193+
wait_for_result=True,
194+
timeout_seconds=timeout_seconds,
195+
custom_tracing_attributes=custom_tracing_attributes,
196+
deployment_name=deployment_name,
197+
)
198+
return response.result
199+
200+
# Use polling method
201+
execution = await self.execute_workflow_async(
202+
workflow_identifier=workflow_identifier,
203+
input=input,
204+
execution_id=execution_id,
205+
custom_tracing_attributes=custom_tracing_attributes,
206+
deployment_name=deployment_name,
207+
)
208+
209+
# Wait for completion
210+
final_execution = await self._wait_for_workflow_completion_async(
211+
execution.execution_id, polling_interval, max_attempts
212+
)
213+
214+
return final_execution.result
215+
216+
async def _wait_for_workflow_completion_async(
217+
self,
218+
execution_id: str,
219+
polling_interval: int = 5,
220+
max_attempts: Optional[int] = None,
221+
) -> models.WorkflowExecutionResponse:
222+
"""Wait for a workflow to complete by polling its status (async version).
223+
224+
Args:
225+
execution_id: Execution ID of the workflow
226+
polling_interval: Seconds between status checks
227+
max_attempts: Maximum number of polling attempts (None for unlimited)
228+
229+
Returns:
230+
WorkflowExecutionResponse with the final execution details
231+
232+
Raises:
233+
TimeoutError: If max_attempts is reached and workflow is still running
234+
RuntimeError: If workflow fails or terminates abnormally
235+
"""
236+
attempts = 0
237+
while True:
238+
response = await self.executions.get_workflow_execution_async(
239+
execution_id=execution_id
240+
)
241+
242+
if response.status != "RUNNING":
243+
if response.status == "COMPLETED":
244+
return response
245+
raise RuntimeError(f"Workflow failed with status: {response.status}")
246+
247+
attempts += 1
248+
if max_attempts is not None and attempts >= max_attempts:
249+
raise TimeoutError(
250+
f"Workflow is still running after {max_attempts} polling attempts"
251+
)
252+
253+
await asyncio.sleep(polling_interval)
254+
255+
# endregion sdk-class-body
256+
52257
def get_workflows(
53258
self,
54259
*,

0 commit comments

Comments
 (0)