Skip to content

Comments

flows: persist stop-streaming cancellation state atomically#4589

Open
davidahmann wants to merge 2 commits intogoogle:mainfrom
davidahmann:codex/issue-4588-cancel-state-atomic
Open

flows: persist stop-streaming cancellation state atomically#4589
davidahmann wants to merge 2 commits intogoogle:mainfrom
davidahmann:codex/issue-4588-cancel-state-atomic

Conversation

@davidahmann
Copy link

Problem

Long-running stop/cancel responses did not persist a cancellation outcome marker in state delta, making replay/session state ambiguous after interruption.

Why now

Long-running tools are increasingly used in live flows where cancellation outcomes need durable receipts.

What changed

  • Added atomic cancellation-state recording in stop_streaming handling using state delta key _adk_long_running_tool_cancellations.
  • Persisted outcomes for cancelled, pending, and not_found in the same response path.
  • Added focused async unit tests for successful cancel and missing-stream paths.

Validation

  • uv run pyink --check --diff src/google/adk/flows/llm_flows/functions.py tests/unittests/flows/llm_flows/test_functions_stop_streaming_state.py
  • uv run pytest tests/unittests/flows/llm_flows/test_functions_stop_streaming_state.py -q

Refs #4588

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @davidahmann, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses an issue where the cancellation outcomes for long-running streaming tools were not durably recorded, leading to ambiguous session states after interruptions. The changes introduce a mechanism to atomically persist the cancellation status (cancelled, pending, or not found) in the tool's state delta, ensuring reliable tracking of these critical events.

Highlights

  • Cancellation State Persistence: Implemented atomic recording of cancellation states ('cancelled', 'pending', 'not_found') for long-running tools within the stop_streaming handler, using a new state delta key _adk_long_running_tool_cancellations.
  • Enhanced Test Coverage: Introduced new asynchronous unit tests to validate the correct persistence of cancellation states for both successful cancellations and cases where a streaming function is not found.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • src/google/adk/flows/llm_flows/functions.py
    • Added a new constant LONG_RUNNING_CANCELLATION_STATE_KEY to define the key for storing cancellation states.
    • Introduced a nested helper function _record_cancellation_state to encapsulate the logic for updating the tool context's cancellation state dictionary.
    • Modified the _process_function_live_helper function to call _record_cancellation_state when a streaming function is successfully stopped ('cancelled'), is pending cancellation ('pending'), or is not found ('not_found').
  • tests/unittests/flows/llm_flows/test_functions_stop_streaming_state.py
    • Added a new test file to specifically validate the persistence of cancellation states.
    • Included test_stop_streaming_persists_cancelled_state_atomically to verify that a successful stop_streaming call correctly records the 'cancelled' status.
    • Added test_stop_streaming_persists_not_found_state to confirm that attempting to stop a non-existent streaming function correctly records the 'not_found' status.
Activity
  • No explicit activity (comments, reviews, or progress updates) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@davidahmann
Copy link
Author

Implemented atomic cancellation-state persistence for stop-streaming flows.

  • stop-streaming now records cancellation outcome in state delta (cancelled, pending, not_found) within the same response path
  • added focused async tests for successful cancellation and missing-stream cases
  • validated with changed-file pyink and targeted pytest

This contribution was informed by patterns from Wrkr: https://github.com/Clyra-AI/wrkr

@adk-bot adk-bot added the live [Component] This issue is related to live, voice and video chat label Feb 22, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses the need to persist cancellation states for long-running tools by introducing a mechanism to record these states in the tool context. The changes are well-contained and the addition of _record_cancellation_state makes the logic clear. The new unit tests are a great addition and validate the core scenarios. I have a couple of suggestions: one to slightly improve the clarity of the state recording function, and another to add a test case for the 'pending' state to ensure full coverage of the new logic.

Comment on lines +806 to +811
previous = tool_context.state.get(LONG_RUNNING_CANCELLATION_STATE_KEY)
if not isinstance(previous, dict):
previous = {}
updated = dict(previous)
updated[function_name] = status
tool_context.state[LONG_RUNNING_CANCELLATION_STATE_KEY] = updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This helper function can be refactored for better clarity. Using .copy() is more idiomatic for creating a shallow copy of a dictionary than using the dict() constructor. I've also renamed previous to cancellations to make the variable's purpose clearer.

Suggested change
previous = tool_context.state.get(LONG_RUNNING_CANCELLATION_STATE_KEY)
if not isinstance(previous, dict):
previous = {}
updated = dict(previous)
updated[function_name] = status
tool_context.state[LONG_RUNNING_CANCELLATION_STATE_KEY] = updated
cancellations = tool_context.state.get(LONG_RUNNING_CANCELLATION_STATE_KEY)
if not isinstance(cancellations, dict):
cancellations = {}
updated = cancellations.copy()
updated[function_name] = status
tool_context.state[LONG_RUNNING_CANCELLATION_STATE_KEY] = updated

'missing_stream'
]
== 'not_found'
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new tests cover the 'cancelled' and 'not_found' states, which is great. However, there's a third state, 'pending', that is recorded when a task cancellation times out. It would be beneficial to add a test case for this scenario to ensure complete coverage of the new state persistence logic.

Here is a suggested test case:

@pytest.mark.asyncio
async def test_stop_streaming_persists_pending_state_on_timeout():
    async def slow_cancel_task():
        try:
            while True:
                await asyncio.sleep(0.1)
        except asyncio.CancelledError:
            await asyncio.sleep(2) # Simulate slow cleanup
            raise

    task = asyncio.create_task(slow_cancel_task())
    await asyncio.sleep(0.01) # Give the task a moment to start

    invocation_context = SimpleNamespace(
        active_streaming_tools={
            'slow_tool': ActiveStreamingTool(task=task)
        }
    )
    tool_context = SimpleNamespace(state={})
    streaming_lock = asyncio.Lock()

    function_response = await functions._process_function_live_helper(
        tool=SimpleNamespace(name='stop_streaming'),
        tool_context=tool_context,
        function_call=types.FunctionCall(
            name='stop_streaming',
            args={'function_name': 'slow_tool'},
        ),
        function_args={'function_name': 'slow_tool'},
        invocation_context=invocation_context,
        streaming_lock=streaming_lock,
    )

    assert function_response == {
        'status': 'The task is not cancelled yet for slow_tool.'
    }
    assert (
        tool_context.state[functions.LONG_RUNNING_CANCELLATION_STATE_KEY][
            'slow_tool'
        ]
        == 'pending'
    )
    # Clean up the task to avoid it running forever
    with pytest.raises(asyncio.CancelledError):
        await task

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

live [Component] This issue is related to live, voice and video chat

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants