flows: persist stop-streaming cancellation state atomically#4589
flows: persist stop-streaming cancellation state atomically#4589davidahmann wants to merge 2 commits intogoogle:mainfrom
Conversation
Summary of ChangesHello @davidahmann, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses an issue where the cancellation outcomes for long-running streaming tools were not durably recorded, leading to ambiguous session states after interruptions. The changes introduce a mechanism to atomically persist the cancellation status (cancelled, pending, or not found) in the tool's state delta, ensuring reliable tracking of these critical events. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Implemented atomic cancellation-state persistence for stop-streaming flows.
This contribution was informed by patterns from Wrkr: https://github.com/Clyra-AI/wrkr |
There was a problem hiding this comment.
Code Review
This pull request effectively addresses the need to persist cancellation states for long-running tools by introducing a mechanism to record these states in the tool context. The changes are well-contained and the addition of _record_cancellation_state makes the logic clear. The new unit tests are a great addition and validate the core scenarios. I have a couple of suggestions: one to slightly improve the clarity of the state recording function, and another to add a test case for the 'pending' state to ensure full coverage of the new logic.
| previous = tool_context.state.get(LONG_RUNNING_CANCELLATION_STATE_KEY) | ||
| if not isinstance(previous, dict): | ||
| previous = {} | ||
| updated = dict(previous) | ||
| updated[function_name] = status | ||
| tool_context.state[LONG_RUNNING_CANCELLATION_STATE_KEY] = updated |
There was a problem hiding this comment.
This helper function can be refactored for better clarity. Using .copy() is more idiomatic for creating a shallow copy of a dictionary than using the dict() constructor. I've also renamed previous to cancellations to make the variable's purpose clearer.
| previous = tool_context.state.get(LONG_RUNNING_CANCELLATION_STATE_KEY) | |
| if not isinstance(previous, dict): | |
| previous = {} | |
| updated = dict(previous) | |
| updated[function_name] = status | |
| tool_context.state[LONG_RUNNING_CANCELLATION_STATE_KEY] = updated | |
| cancellations = tool_context.state.get(LONG_RUNNING_CANCELLATION_STATE_KEY) | |
| if not isinstance(cancellations, dict): | |
| cancellations = {} | |
| updated = cancellations.copy() | |
| updated[function_name] = status | |
| tool_context.state[LONG_RUNNING_CANCELLATION_STATE_KEY] = updated |
| 'missing_stream' | ||
| ] | ||
| == 'not_found' | ||
| ) |
There was a problem hiding this comment.
The new tests cover the 'cancelled' and 'not_found' states, which is great. However, there's a third state, 'pending', that is recorded when a task cancellation times out. It would be beneficial to add a test case for this scenario to ensure complete coverage of the new state persistence logic.
Here is a suggested test case:
@pytest.mark.asyncio
async def test_stop_streaming_persists_pending_state_on_timeout():
async def slow_cancel_task():
try:
while True:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
await asyncio.sleep(2) # Simulate slow cleanup
raise
task = asyncio.create_task(slow_cancel_task())
await asyncio.sleep(0.01) # Give the task a moment to start
invocation_context = SimpleNamespace(
active_streaming_tools={
'slow_tool': ActiveStreamingTool(task=task)
}
)
tool_context = SimpleNamespace(state={})
streaming_lock = asyncio.Lock()
function_response = await functions._process_function_live_helper(
tool=SimpleNamespace(name='stop_streaming'),
tool_context=tool_context,
function_call=types.FunctionCall(
name='stop_streaming',
args={'function_name': 'slow_tool'},
),
function_args={'function_name': 'slow_tool'},
invocation_context=invocation_context,
streaming_lock=streaming_lock,
)
assert function_response == {
'status': 'The task is not cancelled yet for slow_tool.'
}
assert (
tool_context.state[functions.LONG_RUNNING_CANCELLATION_STATE_KEY][
'slow_tool'
]
== 'pending'
)
# Clean up the task to avoid it running forever
with pytest.raises(asyncio.CancelledError):
await task
Problem
Long-running stop/cancel responses did not persist a cancellation outcome marker in state delta, making replay/session state ambiguous after interruption.
Why now
Long-running tools are increasingly used in live flows where cancellation outcomes need durable receipts.
What changed
stop_streaminghandling using state delta key_adk_long_running_tool_cancellations.cancelled,pending, andnot_foundin the same response path.Validation
uv run pyink --check --diff src/google/adk/flows/llm_flows/functions.py tests/unittests/flows/llm_flows/test_functions_stop_streaming_state.pyuv run pytest tests/unittests/flows/llm_flows/test_functions_stop_streaming_state.py -qRefs #4588