Skip to content

fix(agents): complete tool-using runs — sequential session use + PromptedOutput (#172, #173)#174

Merged
w7-mgfcode merged 4 commits into
devfrom
fix/agents-complete-tool-runs
May 18, 2026
Merged

fix(agents): complete tool-using runs — sequential session use + PromptedOutput (#172, #173)#174
w7-mgfcode merged 4 commits into
devfrom
fix/agents-complete-tool-runs

Conversation

@w7-mgfcode
Copy link
Copy Markdown
Owner

@w7-mgfcode w7-mgfcode commented May 18, 2026

Summary

Closes #172 and #173. Both bugs were found by a capture_run_messages diagnostic of a failing experiment-agent run.

#172 — concurrent tool calls share one AsyncSession

When the model emits multiple DB-touching tool calls in one turn (e.g. tool_run_backtest for naive and seasonal_naive), PydanticAI runs them concurrently. Every tool shares the single AgentDeps.db AsyncSession; SQLAlchemy forbids concurrent operations → InvalidRequestError. Affects every model.

Fix: wrap agent.run() / agent.run_stream() in Agent.parallel_tool_call_execution_mode("sequential").

#173ToolOutput mode too strict for weaker models

output_type=ExperimentReport / RAGAnswer defaulted to PydanticAI's ToolOutput mode. Weaker/local models answer in prose; PydanticAI rejects it (json_invalid) and the run fails with Exceeded maximum output retries.

Fix: wrap both output_types in PromptedOutput(...) — schema-in-prompt + text-JSON parse. Works for local and cloud models.

Tests

  • test_chat_runs_tools_sequentiallychat() runs under sequential execution mode.
  • test_experiment_agent_uses_prompted_output / test_rag_assistant_agent_uses_prompted_output — both agents build with a PromptedOutputSchema.

Verification

  • ruff · ✅ mypy --strict · ✅ pyright (0 errors) · ✅ 112 agent unit tests
  • Diagnostic confirmed: with the sequential fix the concurrency error is gone and both backtests complete; PromptedOutput is active and validates the model's JSON output.

Note

These two fixes make the agent layer correct and robust. They do not, on their own, make a small local model (qwen2.5:7b) reliably complete a multi-tool experiment — that remains a model-capability matter tracked separately.

Summary by Sourcery

Ensure agent tool calls run sequentially and relax structured output handling for experiment and RAG agents to support weaker models.

Bug Fixes:

  • Run chat() and stream_chat() under sequential tool execution to avoid concurrent database access on a shared AsyncSession.
  • Use PromptedOutput-based schemas for ExperimentReport and RAGAnswer outputs so weaker/local models can successfully return structured results instead of failing strict ToolOutput validation.

Tests:

  • Add chat() regression test to assert sequential tool execution mode is active during runs.
  • Add regression tests to confirm experiment and RAG assistant agents are configured with PromptedOutput-based output schemas.

#172)

When the model emitted multiple DB-touching tool calls in one turn,
PydanticAI executed them concurrently. Every agent tool shares the
single AgentDeps.db AsyncSession, and SQLAlchemy forbids concurrent
operations on one session, so the run failed intermittently with
"InvalidRequestError: concurrent operations are not permitted".

- Wrap agent.run() (chat) and agent.run_stream() (stream_chat) in
  Agent.parallel_tool_call_execution_mode("sequential").
- Add a regression test asserting chat() runs under sequential mode.
…ed output (#173)

Both agents declared output_type as a plain model, which PydanticAI
serves via its default ToolOutput mode (the model must call a hidden
final_result tool). Weaker/local models answer in plain prose instead,
PydanticAI rejects it as json_invalid, and the run fails with
"Exceeded maximum output retries".

- Wrap the experiment and rag_assistant output_type in PromptedOutput,
  which places the JSON schema in the prompt and parses the model's
  text reply. Works for local and cloud models alike.
- Add tests asserting both agents build with a PromptedOutputSchema.
@sourcery-ai
Copy link
Copy Markdown

sourcery-ai Bot commented May 18, 2026

Reviewer's Guide

Ensures agent tool calls run sequentially to avoid AsyncSession concurrency errors and switches experiment and RAG agents to PromptedOutput-based structured outputs for better compatibility with weaker/local models, with accompanying regression tests.

Sequence diagram for sequential tool execution in chat and stream_chat

sequenceDiagram
    actor Client
    participant AgentsService
    participant Agent
    participant AgentDeps_db as AgentDeps_db

    Client->>AgentsService: chat(message) / stream_chat(message)
    activate AgentsService

    AgentsService->>Agent: parallel_tool_call_execution_mode(sequential)
    activate Agent
    Agent-->>AgentsService: execution_mode_context

    AgentsService->>Agent: run(message, deps, message_history)
    note over Agent: Tool calls execute sequentially
    loop per_tool_call
        Agent->>AgentDeps_db: [DB operation]
        AgentDeps_db-->>Agent: result
    end
    Agent-->>AgentsService: result
    deactivate Agent

    AgentsService-->>Client: response / StreamEvent(complete)
    deactivate AgentsService
Loading

File-Level Changes

Change Details Files
Run both non-streaming and streaming agent calls under sequential tool execution mode to avoid concurrent DB access on a shared AsyncSession.
  • Wrap chat() agent.run(...) invocation in Agent.parallel_tool_call_execution_mode("sequential") while preserving the existing asyncio.wait_for timeout behavior.
  • Wrap stream_chat() agent.run_stream(...) context in Agent.parallel_tool_call_execution_mode("sequential") while preserving timeout, streaming, session update, and approval handling logic.
  • Add regression test that verifies chat() executes with the "sequential" parallel execution mode by capturing the active mode from _parallel_execution_mode_ctx_var during a mocked agent.run().
app/features/agents/service.py
app/features/agents/tests/test_service.py
Use PromptedOutput-based structured output for experiment and RAG agents so weaker/local models can return schema-conformant JSON via text parsing instead of strict ToolOutput mode.
  • Change create_experiment_agent() to configure the Agent with output_type=PromptedOutput(ExperimentReport) instead of the default ToolOutput-based ExperimentReport.
  • Change create_rag_assistant_agent() to configure the Agent with output_type=PromptedOutput(RAGAnswer).
  • Add regression tests that assert both experiment and RAG assistant agents are built with a PromptedOutputSchema output schema when using a local/ollama default model.
app/features/agents/agents/experiment.py
app/features/agents/agents/rag_assistant.py
app/features/agents/tests/test_base.py

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 012af6cd-9157-4197-875f-beef4b612864

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/agents-complete-tool-runs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • The new tests reach into private internals (_parallel_execution_mode_ctx_var, agent._output_schema), which makes them brittle against library changes; consider asserting behavior via public APIs or helper wrappers instead of relying on private attributes.
  • The Agent.parallel_tool_call_execution_mode("sequential") wrapping is duplicated in both chat and stream_chat; consider extracting this into a small helper/context wrapper to keep the execution-mode policy in one place.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new tests reach into private internals (`_parallel_execution_mode_ctx_var`, `agent._output_schema`), which makes them brittle against library changes; consider asserting behavior via public APIs or helper wrappers instead of relying on private attributes.
- The `Agent.parallel_tool_call_execution_mode("sequential")` wrapping is duplicated in both `chat` and `stream_chat`; consider extracting this into a small helper/context wrapper to keep the execution-mode policy in one place.

## Individual Comments

### Comment 1
<location path="app/features/agents/tests/test_service.py" line_range="338-347" />
<code_context>
+    async def test_chat_runs_tools_sequentially(
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a matching stream_chat test to assert sequential tool execution in the streaming path as well

Since `stream_chat()` now wraps `agent.run_stream(...)` in the same `Agent.parallel_tool_call_execution_mode("sequential")` context, it’d be helpful to add a streaming test that mirrors this one.

For example:
- Patch `_get_agent` to return a mock whose async `run_stream` context manager records `_parallel_execution_mode_ctx_var.get()`.
- Call `service.stream_chat(...)` and fully consume the async generator.
- Assert the recorded mode is `"sequential"`.

This will ensure both the sync and streaming paths are covered for issue #172 and protect against regressions if only one code path is later changed.

Suggested implementation:

```python
from unittest.mock import AsyncMock

from pydantic_ai.tool_manager import _parallel_execution_mode_ctx_var

from app.features.agents.deps import AgentDeps
from app.features.agents.models import AgentSession, AgentType, SessionStatus

```

```python
    @pytest.mark.asyncio
    async def test_chat_runs_tools_sequentially(
        self,
        sample_active_session: AgentSession,
        sample_experiment_report: ExperimentReport,
    ) -> None:
        """chat() must run the agent under sequential tool execution.

        Regression for issue #172: every tool shares the single AgentDeps.db
        AsyncSession, so concurrent tool calls raised InvalidRequestError.
        """
        service = AgentService()

    @pytest.mark.asyncio
    async def test_stream_chat_runs_tools_sequentially(
        self,
        sample_active_session: AgentSession,
        sample_experiment_report: ExperimentReport,
    ) -> None:
        """stream_chat() must run the agent under sequential tool execution.

        Mirrors test_chat_runs_tools_sequentially for the streaming path.
        """
        service = AgentService()

        recorded_modes: list[str] = []

        class MockRunStreamContext:
            async def __aenter__(self_inner):
                # Record the execution mode when run_stream is entered
                recorded_modes.append(_parallel_execution_mode_ctx_var.get())

                async def _gen():
                    # Yield at least one event so the async generator is consumed
                    yield {"event": "test"}

                return _gen()

            async def __aexit__(self_inner, exc_type, exc, tb) -> bool:
                return False

        mock_agent = AsyncMock()
        mock_agent.run_stream.return_value = MockRunStreamContext()

        async def _get_agent_override(*args, **kwargs):
            return mock_agent

        # Patch the service to use our mock agent
        service._get_agent = _get_agent_override

        # Consume the streaming response to ensure run_stream is actually entered
        async for _ in service.stream_chat(
            session=sample_active_session,
            message="hello",
            experiment_report=sample_experiment_report,
        ):
            pass

        assert recorded_modes == ["sequential"]

```

1. If `AgentService.stream_chat(...)` has a different signature (e.g., different parameter names or does not accept `experiment_report`), adjust the call in `test_stream_chat_runs_tools_sequentially` to match the actual method signature.
2. If the tests use a shared fixture or factory to construct `AgentService` instances instead of calling `AgentService()` directly, replace `service = AgentService()` with the appropriate fixture usage for consistency with the rest of the test file.
3. If `stream_chat` returns a different kind of stream payload than a dict with `{"event": "test"}`, the concrete value yielded in `_gen()` is irrelevant to this assertion and can be adjusted or simplified as needed.
</issue_to_address>

### Comment 2
<location path="app/features/agents/tests/test_base.py" line_range="48-57" />
<code_context>
     validate_api_key_for_model("ollama:llama3.1")
+
+
+def test_experiment_agent_uses_prompted_output():
+    """The experiment agent uses PromptedOutput, not the default ToolOutput.
+
+    Regression for issue #173: weaker/local models cannot satisfy the
+    default tool-based structured-output mode and fail output validation.
+    """
+    settings = get_settings()
+    settings.agent_default_model = "ollama:llama3.1"
+    agent = create_experiment_agent()
+    assert type(agent._output_schema).__name__ == "PromptedOutputSchema"
+
+
</code_context>
<issue_to_address>
**suggestion:** Make the PromptedOutput assertion more robust than checking the class name string

This assertion is brittle because it relies on the class name string, so a rename or refactor could silently weaken this regression test for #173. Instead, consider asserting on the actual type (e.g. `from pydantic_ai import PromptedOutputSchema` and `assert isinstance(agent._output_schema, PromptedOutputSchema)`) or on a stable attribute (e.g. that it wraps `ExperimentReport` / `RAGAnswer`, such as `agent._output_schema.output_type is ExperimentReport`). This keeps the regression meaningful while remaining robust to internal changes.

Suggested implementation:

```python
from pydantic_ai import PromptedOutputSchema

from app.features.agents.agents.experiment import create_experiment_agent
from app.features.agents.agents.rag_assistant import create_rag_assistant_agent

```

```python
    agent = create_experiment_agent()
    assert isinstance(agent._output_schema, PromptedOutputSchema)

```
</issue_to_address>

### Comment 3
<location path="app/features/agents/tests/test_base.py" line_range="60-57" />
<code_context>
+    assert type(agent._output_schema).__name__ == "PromptedOutputSchema"
+
+
+def test_rag_assistant_agent_uses_prompted_output():
+    """The RAG assistant agent uses PromptedOutput (issue #173)."""
+    settings = get_settings()
+    settings.agent_default_model = "ollama:llama3.1"
+    agent = create_rag_assistant_agent()
+    assert type(agent._output_schema).__name__ == "PromptedOutputSchema"
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a behavioral test to show PromptedOutput tolerates prose + JSON from weaker models

These tests verify that `PromptedOutput` is wired into both agents, but they don’t yet demonstrate that it fixes the original failure. To cover that, consider a behavioral test using a fake/mocked model that returns mixed prose + JSON (like weaker/local models). Then assert that the agent run succeeds and produces a valid `ExperimentReport`/`RAGAnswer` rather than failing with `json_invalid` or exhausting retries. Even a small unit/integration test like this would show that the parsing behavior actually resolves the issue from #173.

Suggested implementation:

```python
def test_rag_assistant_agent_uses_prompted_output():
    """The RAG assistant agent uses PromptedOutput (issue #173)."""
    settings = get_settings()
    settings.agent_default_model = "ollama:llama3.1"
    agent = create_rag_assistant_agent()
    assert type(agent._output_schema).__name__ == "PromptedOutputSchema"


def test_experiment_agent_prompted_output_tolerates_prose_plus_json(monkeypatch):
    """
    PromptedOutput for the experiment agent can parse mixed prose + JSON from weaker/local models.

    This is a behavioral regression test for issue #173:
    the agent should succeed and return a valid ExperimentReport instead of failing
    with json_invalid / retry exhaustion when the model wraps JSON in natural language.
    """
    settings = get_settings()
    settings.agent_default_model = "ollama:llama3.1"
    agent = create_experiment_agent()

    mixed_prose_and_json = """
    Here's your experiment report, let me explain it first.

    The experiment went quite well, and I've summarized the key details below.
    Please find the structured JSON report after this explanation:

    {
        "title": "Effect of temperature on widget performance",
        "summary": "Higher temperatures led to a measurable decrease in widget efficiency.",
        "hypothesis": "Widget efficiency decreases as temperature increases.",
        "methodology": "We tested 50 widgets at 20°C, 30°C, and 40°C, measuring throughput.",
        "results": "Throughput decreased by ~15% from 20°C to 40°C.",
        "conclusion": "Temperature has a negative impact on widget performance.",
        "limitations": "Limited sample size and controlled lab environment.",
        "next_steps": "Repeat experiment in production-like conditions with more widgets."
    }

    Hope this helps!
    """

    class FakeModel:
        """Fake model that simulates a weaker/local model returning prose + JSON."""

        def run(self, *args, **kwargs):
            # The agent's PromptedOutput layer should robustly extract the JSON portion
            # and parse it into an ExperimentReport without raising validation errors.
            return mixed_prose_and_json

    # Monkeypatch the agent's underlying model invocation to use our fake model.
    # Adjust the attribute/method here to match the actual implementation
    # (e.g. `agent._model`, `agent._llm`, `agent._client`, etc.).
    fake_model = FakeModel()
    if hasattr(agent, "_model"):
        monkeypatch.setattr(agent, "_model", fake_model)
    elif hasattr(agent, "model"):
        monkeypatch.setattr(agent, "model", fake_model)
    else:
        # Fallback: assume the agent delegates to a callable `_call_model`/`_call_llm`
        # that we can monkeypatch directly to return the mixed string.
        if hasattr(agent, "_call_model"):
            monkeypatch.setattr(agent, "_call_model", lambda *a, **k: mixed_prose_and_json)
        elif hasattr(agent, "_call_llm"):
            monkeypatch.setattr(agent, "_call_llm", lambda *a, **k: mixed_prose_and_json)
        else:
            # If the agent exposes a generic `run_llm` we patch that.
            monkeypatch.setattr(agent, "run_llm", lambda *a, **k: mixed_prose_and_json, raising=False)

    # Run the agent end-to-end and assert we get a successfully parsed ExperimentReport.
    report = agent.run("Run an experiment about widget performance vs temperature")

    # The exact type may be a dataclass, pydantic model, or alias; we check against the
    # domain type used for experiment reports.
    assert isinstance(report, ExperimentReport)
    assert report.title
    assert "temperature" in report.summary.lower()


def test_rag_assistant_agent_prompted_output_tolerates_prose_plus_json(monkeypatch):
    """
    PromptedOutput for the RAG assistant can parse mixed prose + JSON from weaker/local models.

    This ensures that the fix for issue #173 is effective for the RAG assistant as well:
    the agent should return a valid RAGAnswer rather than failing JSON validation.
    """
    settings = get_settings()
    settings.agent_default_model = "ollama:llama3.1"
    agent = create_rag_assistant_agent()

    mixed_prose_and_json = """
    I found some relevant documents for your question. I'll first explain,
    then provide a JSON answer object you can consume programmatically.

    In short, the documentation says that widgets should not be operated
    above 45°C for extended periods. Below is the structured answer:

    {
        "answer": "Widgets should not be operated above 45°C for extended periods due to reduced efficiency and higher failure rates.",
        "citations": [
            {
                "source_id": "doc-123",
                "excerpt": "Operating widgets above 45°C significantly reduces their lifespan.",
                "url": "https://example.com/widgets/temperature-guidelines"
            }
        ],
        "metadata": {
            "confidence": 0.87
        }
    }

    Let me know if you need more details.
    """

    class FakeModel:
        """Fake model that simulates a weaker/local model returning prose + JSON."""

        def run(self, *args, **kwargs):
            return mixed_prose_and_json

    fake_model = FakeModel()
    if hasattr(agent, "_model"):
        monkeypatch.setattr(agent, "_model", fake_model)
    elif hasattr(agent, "model"):
        monkeypatch.setattr(agent, "model", fake_model)
    else:
        if hasattr(agent, "_call_model"):
            monkeypatch.setattr(agent, "_call_model", lambda *a, **k: mixed_prose_and_json)
        elif hasattr(agent, "_call_llm"):
            monkeypatch.setattr(agent, "_call_llm", lambda *a, **k: mixed_prose_and_json)
        else:
            monkeypatch.setattr(agent, "run_llm", lambda *a, **k: mixed_prose_and_json, raising=False)

    answer = agent.run("What are the temperature guidelines for widgets?")

    assert isinstance(answer, RAGAnswer)
    assert "widgets" in answer.answer.lower()
    assert answer.citations

```

1. Ensure the domain types `ExperimentReport` and `RAGAnswer` are imported at the top of `app/features/agents/tests/test_base.py` from their actual locations in your codebase. For example:
   - `from app.features.agents.schemas import ExperimentReport, RAGAnswer`
   or the equivalent module where these are defined.
2. The monkeypatching logic assumes a small set of possible attributes/methods (`_model`, `model`, `_call_model`, `_call_llm`, `run_llm`) through which the agents talk to the underlying LLM. Adjust the monkeypatch targets to match the real implementation:
   - If `create_experiment_agent()` / `create_rag_assistant_agent()` expose a different attribute for the LLM client (e.g. `agent._client`, `agent.llm`), update the branches accordingly so that calling `agent.run(...)` uses the fake model output.
3. If `agent.run(...)` is asynchronous (e.g. `async def run(...)`), convert these tests to use `pytest.mark.asyncio` and `await agent.run(...)` instead of a direct call.
4. If the agents return a wrapper object or `PromptedOutput` instance that contains `.parsed` or similar instead of returning `ExperimentReport` / `RAGAnswer` directly, update the assertions to inspect the parsed payload, e.g.:
   - `report = agent.run(...).parsed`
   - `answer = agent.run(...).parsed`
5. If the mixed prose + JSON parsing is surfaced via specific error fields (e.g. `json_invalid`, `retries_exhausted`) on the result, you can strengthen the regression check by asserting those are not set in addition to the successful type checks.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +338 to +347
async def test_chat_runs_tools_sequentially(
self,
sample_active_session: AgentSession,
sample_experiment_report: ExperimentReport,
) -> None:
"""chat() must run the agent under sequential tool execution.

Regression for issue #172: every tool shares the single AgentDeps.db
AsyncSession, so concurrent tool calls raised InvalidRequestError.
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding a matching stream_chat test to assert sequential tool execution in the streaming path as well

Since stream_chat() now wraps agent.run_stream(...) in the same Agent.parallel_tool_call_execution_mode("sequential") context, it’d be helpful to add a streaming test that mirrors this one.

For example:

  • Patch _get_agent to return a mock whose async run_stream context manager records _parallel_execution_mode_ctx_var.get().
  • Call service.stream_chat(...) and fully consume the async generator.
  • Assert the recorded mode is "sequential".

This will ensure both the sync and streaming paths are covered for issue #172 and protect against regressions if only one code path is later changed.

Suggested implementation:

from unittest.mock import AsyncMock

from pydantic_ai.tool_manager import _parallel_execution_mode_ctx_var

from app.features.agents.deps import AgentDeps
from app.features.agents.models import AgentSession, AgentType, SessionStatus
    @pytest.mark.asyncio
    async def test_chat_runs_tools_sequentially(
        self,
        sample_active_session: AgentSession,
        sample_experiment_report: ExperimentReport,
    ) -> None:
        """chat() must run the agent under sequential tool execution.

        Regression for issue #172: every tool shares the single AgentDeps.db
        AsyncSession, so concurrent tool calls raised InvalidRequestError.
        """
        service = AgentService()

    @pytest.mark.asyncio
    async def test_stream_chat_runs_tools_sequentially(
        self,
        sample_active_session: AgentSession,
        sample_experiment_report: ExperimentReport,
    ) -> None:
        """stream_chat() must run the agent under sequential tool execution.

        Mirrors test_chat_runs_tools_sequentially for the streaming path.
        """
        service = AgentService()

        recorded_modes: list[str] = []

        class MockRunStreamContext:
            async def __aenter__(self_inner):
                # Record the execution mode when run_stream is entered
                recorded_modes.append(_parallel_execution_mode_ctx_var.get())

                async def _gen():
                    # Yield at least one event so the async generator is consumed
                    yield {"event": "test"}

                return _gen()

            async def __aexit__(self_inner, exc_type, exc, tb) -> bool:
                return False

        mock_agent = AsyncMock()
        mock_agent.run_stream.return_value = MockRunStreamContext()

        async def _get_agent_override(*args, **kwargs):
            return mock_agent

        # Patch the service to use our mock agent
        service._get_agent = _get_agent_override

        # Consume the streaming response to ensure run_stream is actually entered
        async for _ in service.stream_chat(
            session=sample_active_session,
            message="hello",
            experiment_report=sample_experiment_report,
        ):
            pass

        assert recorded_modes == ["sequential"]
  1. If AgentService.stream_chat(...) has a different signature (e.g., different parameter names or does not accept experiment_report), adjust the call in test_stream_chat_runs_tools_sequentially to match the actual method signature.
  2. If the tests use a shared fixture or factory to construct AgentService instances instead of calling AgentService() directly, replace service = AgentService() with the appropriate fixture usage for consistency with the rest of the test file.
  3. If stream_chat returns a different kind of stream payload than a dict with {"event": "test"}, the concrete value yielded in _gen() is irrelevant to this assertion and can be adjusted or simplified as needed.

Comment thread app/features/agents/tests/test_base.py Outdated
Comment on lines +48 to +57
def test_experiment_agent_uses_prompted_output():
"""The experiment agent uses PromptedOutput, not the default ToolOutput.

Regression for issue #173: weaker/local models cannot satisfy the
default tool-based structured-output mode and fail output validation.
"""
settings = get_settings()
settings.agent_default_model = "ollama:llama3.1"
agent = create_experiment_agent()
assert type(agent._output_schema).__name__ == "PromptedOutputSchema"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Make the PromptedOutput assertion more robust than checking the class name string

This assertion is brittle because it relies on the class name string, so a rename or refactor could silently weaken this regression test for #173. Instead, consider asserting on the actual type (e.g. from pydantic_ai import PromptedOutputSchema and assert isinstance(agent._output_schema, PromptedOutputSchema)) or on a stable attribute (e.g. that it wraps ExperimentReport / RAGAnswer, such as agent._output_schema.output_type is ExperimentReport). This keeps the regression meaningful while remaining robust to internal changes.

Suggested implementation:

from pydantic_ai import PromptedOutputSchema

from app.features.agents.agents.experiment import create_experiment_agent
from app.features.agents.agents.rag_assistant import create_rag_assistant_agent
    agent = create_experiment_agent()
    assert isinstance(agent._output_schema, PromptedOutputSchema)

Comment thread app/features/agents/tests/test_base.py Outdated
settings = get_settings()
settings.agent_default_model = "ollama:llama3.1"
agent = create_experiment_agent()
assert type(agent._output_schema).__name__ == "PromptedOutputSchema"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding a behavioral test to show PromptedOutput tolerates prose + JSON from weaker models

These tests verify that PromptedOutput is wired into both agents, but they don’t yet demonstrate that it fixes the original failure. To cover that, consider a behavioral test using a fake/mocked model that returns mixed prose + JSON (like weaker/local models). Then assert that the agent run succeeds and produces a valid ExperimentReport/RAGAnswer rather than failing with json_invalid or exhausting retries. Even a small unit/integration test like this would show that the parsing behavior actually resolves the issue from #173.

Suggested implementation:

def test_rag_assistant_agent_uses_prompted_output():
    """The RAG assistant agent uses PromptedOutput (issue #173)."""
    settings = get_settings()
    settings.agent_default_model = "ollama:llama3.1"
    agent = create_rag_assistant_agent()
    assert type(agent._output_schema).__name__ == "PromptedOutputSchema"


def test_experiment_agent_prompted_output_tolerates_prose_plus_json(monkeypatch):
    """
    PromptedOutput for the experiment agent can parse mixed prose + JSON from weaker/local models.

    This is a behavioral regression test for issue #173:
    the agent should succeed and return a valid ExperimentReport instead of failing
    with json_invalid / retry exhaustion when the model wraps JSON in natural language.
    """
    settings = get_settings()
    settings.agent_default_model = "ollama:llama3.1"
    agent = create_experiment_agent()

    mixed_prose_and_json = """
    Here's your experiment report, let me explain it first.

    The experiment went quite well, and I've summarized the key details below.
    Please find the structured JSON report after this explanation:

    {
        "title": "Effect of temperature on widget performance",
        "summary": "Higher temperatures led to a measurable decrease in widget efficiency.",
        "hypothesis": "Widget efficiency decreases as temperature increases.",
        "methodology": "We tested 50 widgets at 20°C, 30°C, and 40°C, measuring throughput.",
        "results": "Throughput decreased by ~15% from 20°C to 40°C.",
        "conclusion": "Temperature has a negative impact on widget performance.",
        "limitations": "Limited sample size and controlled lab environment.",
        "next_steps": "Repeat experiment in production-like conditions with more widgets."
    }

    Hope this helps!
    """

    class FakeModel:
        """Fake model that simulates a weaker/local model returning prose + JSON."""

        def run(self, *args, **kwargs):
            # The agent's PromptedOutput layer should robustly extract the JSON portion
            # and parse it into an ExperimentReport without raising validation errors.
            return mixed_prose_and_json

    # Monkeypatch the agent's underlying model invocation to use our fake model.
    # Adjust the attribute/method here to match the actual implementation
    # (e.g. `agent._model`, `agent._llm`, `agent._client`, etc.).
    fake_model = FakeModel()
    if hasattr(agent, "_model"):
        monkeypatch.setattr(agent, "_model", fake_model)
    elif hasattr(agent, "model"):
        monkeypatch.setattr(agent, "model", fake_model)
    else:
        # Fallback: assume the agent delegates to a callable `_call_model`/`_call_llm`
        # that we can monkeypatch directly to return the mixed string.
        if hasattr(agent, "_call_model"):
            monkeypatch.setattr(agent, "_call_model", lambda *a, **k: mixed_prose_and_json)
        elif hasattr(agent, "_call_llm"):
            monkeypatch.setattr(agent, "_call_llm", lambda *a, **k: mixed_prose_and_json)
        else:
            # If the agent exposes a generic `run_llm` we patch that.
            monkeypatch.setattr(agent, "run_llm", lambda *a, **k: mixed_prose_and_json, raising=False)

    # Run the agent end-to-end and assert we get a successfully parsed ExperimentReport.
    report = agent.run("Run an experiment about widget performance vs temperature")

    # The exact type may be a dataclass, pydantic model, or alias; we check against the
    # domain type used for experiment reports.
    assert isinstance(report, ExperimentReport)
    assert report.title
    assert "temperature" in report.summary.lower()


def test_rag_assistant_agent_prompted_output_tolerates_prose_plus_json(monkeypatch):
    """
    PromptedOutput for the RAG assistant can parse mixed prose + JSON from weaker/local models.

    This ensures that the fix for issue #173 is effective for the RAG assistant as well:
    the agent should return a valid RAGAnswer rather than failing JSON validation.
    """
    settings = get_settings()
    settings.agent_default_model = "ollama:llama3.1"
    agent = create_rag_assistant_agent()

    mixed_prose_and_json = """
    I found some relevant documents for your question. I'll first explain,
    then provide a JSON answer object you can consume programmatically.

    In short, the documentation says that widgets should not be operated
    above 45°C for extended periods. Below is the structured answer:

    {
        "answer": "Widgets should not be operated above 45°C for extended periods due to reduced efficiency and higher failure rates.",
        "citations": [
            {
                "source_id": "doc-123",
                "excerpt": "Operating widgets above 45°C significantly reduces their lifespan.",
                "url": "https://example.com/widgets/temperature-guidelines"
            }
        ],
        "metadata": {
            "confidence": 0.87
        }
    }

    Let me know if you need more details.
    """

    class FakeModel:
        """Fake model that simulates a weaker/local model returning prose + JSON."""

        def run(self, *args, **kwargs):
            return mixed_prose_and_json

    fake_model = FakeModel()
    if hasattr(agent, "_model"):
        monkeypatch.setattr(agent, "_model", fake_model)
    elif hasattr(agent, "model"):
        monkeypatch.setattr(agent, "model", fake_model)
    else:
        if hasattr(agent, "_call_model"):
            monkeypatch.setattr(agent, "_call_model", lambda *a, **k: mixed_prose_and_json)
        elif hasattr(agent, "_call_llm"):
            monkeypatch.setattr(agent, "_call_llm", lambda *a, **k: mixed_prose_and_json)
        else:
            monkeypatch.setattr(agent, "run_llm", lambda *a, **k: mixed_prose_and_json, raising=False)

    answer = agent.run("What are the temperature guidelines for widgets?")

    assert isinstance(answer, RAGAnswer)
    assert "widgets" in answer.answer.lower()
    assert answer.citations
  1. Ensure the domain types ExperimentReport and RAGAnswer are imported at the top of app/features/agents/tests/test_base.py from their actual locations in your codebase. For example:
    • from app.features.agents.schemas import ExperimentReport, RAGAnswer
      or the equivalent module where these are defined.
  2. The monkeypatching logic assumes a small set of possible attributes/methods (_model, model, _call_model, _call_llm, run_llm) through which the agents talk to the underlying LLM. Adjust the monkeypatch targets to match the real implementation:
    • If create_experiment_agent() / create_rag_assistant_agent() expose a different attribute for the LLM client (e.g. agent._client, agent.llm), update the branches accordingly so that calling agent.run(...) uses the fake model output.
  3. If agent.run(...) is asynchronous (e.g. async def run(...)), convert these tests to use pytest.mark.asyncio and await agent.run(...) instead of a direct call.
  4. If the agents return a wrapper object or PromptedOutput instance that contains .parsed or similar instead of returning ExperimentReport / RAGAnswer directly, update the assertions to inspect the parsed payload, e.g.:
    • report = agent.run(...).parsed
    • answer = agent.run(...).parsed
  5. If the mixed prose + JSON parsing is surfaced via specific error fields (e.g. json_invalid, retries_exhausted) on the result, you can strengthen the regression check by asserting those are not set in addition to the successful type checks.

…den agent tests (#173)

Addresses code-review feedback on PR #174:

- Extract the duplicated Agent.parallel_tool_call_execution_mode("sequential")
  wrapping from chat() and stream_chat() into a _sequential_tool_execution()
  helper, so the issue-#172 execution-mode policy lives in one place.
- Replace test reliance on private internals. test_base.py no longer asserts
  agent._output_schema's class-name string; it now verifies PromptedOutput
  behaviorally via the public FunctionModel test double (no final_result
  output tool registered, plain-text JSON reply parsed into the schema).
- test_service.py drops the private _parallel_execution_mode_ctx_var import
  and asserts the public Agent.parallel_tool_call_execution_mode API instead.
- Add test_stream_chat_runs_tools_sequentially mirroring the chat() test so
  the streaming path is covered against issue #172 regressions.
…ol-runs

# Conflicts:
#	app/features/agents/tests/test_base.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant