Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions openhands-sdk/openhands/sdk/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,10 @@ def step(
"skipping hook check for legacy conversation state."
)

# Prepare LLM messages using the utility function
# Prepare LLM messages from the cached, incrementally-maintained view.
# See https://github.com/OpenHands/software-agent-sdk/issues/3053.
_messages_or_condensation = prepare_llm_messages(
state.events, condenser=self.condenser, llm=self.llm
state.view, condenser=self.condenser, llm=self.llm
)

# Process condensation event before agent sampels another action
Expand Down Expand Up @@ -554,6 +555,13 @@ def step(
"triggering condensation retry with condensed history: "
f"{e}"
)
# The incremental view we just sent may itself be the source of
# the malformed history (e.g., a manipulation-indices bug that
# let a property slip). Re-derive it from the event log with
# full `enforce_properties` before kicking off the condensation
# retry, so the condenser operates on a clean, enforced view.
# See https://github.com/OpenHands/software-agent-sdk/issues/3053.
state.rebuild_view()
on_event(CondensationRequest())
return
logger.warning(
Expand Down
25 changes: 13 additions & 12 deletions openhands-sdk/openhands/sdk/agent/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import subprocess
import textwrap
import types
from collections.abc import Collection, Sequence
from collections.abc import Collection
from typing import (
Annotated,
Any,
Expand All @@ -21,7 +21,7 @@
from openhands.sdk.context.condenser.base import CondenserBase
from openhands.sdk.context.view import View
from openhands.sdk.conversation.types import ConversationTokenCallbackType
from openhands.sdk.event.base import Event, LLMConvertibleEvent
from openhands.sdk.event.base import LLMConvertibleEvent
from openhands.sdk.event.condenser import Condensation
from openhands.sdk.llm import LLM, LLMResponse, Message
from openhands.sdk.tool import Action, ToolDefinition
Expand Down Expand Up @@ -444,7 +444,7 @@ def normalize_tool_call(

@overload
def prepare_llm_messages(
events: Sequence[Event],
view: View,
condenser: None = None,
additional_messages: list[Message] | None = None,
llm: LLM | None = None,
Expand All @@ -453,27 +453,33 @@ def prepare_llm_messages(

@overload
def prepare_llm_messages(
events: Sequence[Event],
view: View,
condenser: CondenserBase,
additional_messages: list[Message] | None = None,
llm: LLM | None = None,
) -> list[Message] | Condensation: ...


def prepare_llm_messages(
events: Sequence[Event],
view: View,
condenser: CondenserBase | None = None,
additional_messages: list[Message] | None = None,
llm: LLM | None = None,
) -> list[Message] | Condensation:
"""Prepare LLM messages from conversation context.
"""Prepare LLM messages from a conversation view.

This utility function extracts the common logic for preparing conversation
context that is shared between agent.step() and ask_agent() methods.
It handles condensation internally and calls the callback when needed.

Callers should pass the cached `ConversationState.view`, which is
maintained incrementally as events are appended. This avoids paying the
O(n) `View.from_events` (with `enforce_properties`) cost on every step.
See https://github.com/OpenHands/software-agent-sdk/issues/3053.

Args:
events: Sequence of events to prepare messages from
view: A `View` of the conversation history. The view is treated as
read-only — see `CondenserBase.condense` for the same contract.
condenser: Optional condenser for handling context window limits
additional_messages: Optional additional messages to append
llm: Optional LLM instance from the agent, passed to condenser for
Expand All @@ -482,12 +488,7 @@ def prepare_llm_messages(
Returns:
List of messages ready for LLM completion, or a Condensation event
if condensation is needed

Raises:
RuntimeError: If condensation is needed but no callback is provided
"""

view = View.from_events(events)
llm_convertible_events: list[LLMConvertibleEvent] = view.events

# If a condenser is registered, we need to give it an
Expand Down
6 changes: 6 additions & 0 deletions openhands-sdk/openhands/sdk/context/condenser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def condense(self, view: View, agent_llm: LLM | None = None) -> View | Condensat

Args:
view: A view of the history containing all events that should be condensed.
**Implementations must treat this view as read-only.** The view may be
a cached projection owned by `ConversationState`
(see https://github.com/OpenHands/software-agent-sdk/issues/3053), and
mutating it in place will corrupt that cache and silently desynchronize
the conversation from its event log. Return a new `View` (e.g.
`View(events=view.events[k:])`) or a `Condensation` event instead.
agent_llm: LLM instance used by the agent. Condensers use this for token
counting purposes. Defaults to None.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ def _default_callback(e):
# This callback runs while holding the conversation state's lock
# (see BaseConversation.compose_callbacks usage inside `with self._state:`
# regions), so updating state here is thread-safe.
self._state.events.append(e)
# Using `append_event` (instead of `state.events.append`) also
# incrementally updates the cached `state.view`, keeping the view
# in sync without paying the O(n) `View.from_events` cost on each
# step.
self._state.append_event(e)
# Track user MessageEvent IDs here so hook callbacks (which may
# synthesize or alter user messages) are captured in one place.
if isinstance(e, MessageEvent) and e.source == "user":
Expand Down Expand Up @@ -383,9 +387,13 @@ def fork(
)

# Deep-copy events from source → fork so the source stays
# immutable.
# immutable. Use `append_event` so the fork's cached view is
# built up incrementally alongside the event log, then call
# `rebuild_view` once at the end to run a full enforcement pass
# over the freshly-populated log (same posture as cold load).
for event in self._state.events:
fork_conv._state.events.append(event.model_copy(deep=True))
fork_conv._state.append_event(event.model_copy(deep=True))
fork_conv._state.rebuild_view()

# Copy runtime state that accumulated during the source
# conversation. activated_knowledge_skills is list[str] – strings
Expand Down Expand Up @@ -1054,7 +1062,7 @@ def ask_agent(self, question: str) -> str:
)

messages = prepare_llm_messages(
self.state.events, additional_messages=[user_message]
self.state.view, additional_messages=[user_message]
)

# Get or create the specialized ask-agent LLM
Expand Down
67 changes: 67 additions & 0 deletions openhands-sdk/openhands/sdk/conversation/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pydantic import Field, PrivateAttr

from openhands.sdk.agent.base import AgentBase
from openhands.sdk.context.view import View
from openhands.sdk.conversation.conversation_stats import ConversationStats
from openhands.sdk.conversation.event_store import EventLog
from openhands.sdk.conversation.fifo_lock import FIFOLock
Expand Down Expand Up @@ -205,6 +206,12 @@ class ConversationState(OpenHandsModel):
# ===== Private attrs (NOT Fields) =====
_fs: FileStore = PrivateAttr() # filestore for persistence
_events: EventLog = PrivateAttr() # now the storage for events
# Cached, incrementally-maintained projection of `_events`. Derived state,
# so it is intentionally not a model field and never persisted. Updated in
# `append_event` on every new event and fully re-derived (with property
# enforcement) via `rebuild_view` on cold load, fork, or error recovery.
# See https://github.com/OpenHands/software-agent-sdk/issues/3053.
_view: View = PrivateAttr(default_factory=View)
_cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption
_autosave_enabled: bool = PrivateAttr(
default=False
Expand All @@ -225,6 +232,59 @@ class ConversationState(OpenHandsModel):
def events(self) -> EventLog:
return self._events

@property
def view(self) -> View:
"""Cached, incrementally-maintained `View` of the conversation events.

The view is updated in lockstep with `events` via `append_event`, so it
always reflects the current `_events` log without paying the O(n) cost
of `View.from_events` on every read. It is fully re-derived (with
property enforcement) only by `rebuild_view`, which is intended for
cold load, fork, and explicit error recovery — not the per-step hot
path.

Callers must treat the returned view as read-only. Mutating it
breaks the cache invariant and will cause silent divergence from
`events`.
"""
return self._view

def append_event(self, event: Event) -> None:
"""Append an event to the conversation, updating the cached view.

This is the only sanctioned write path for adding events to a
running conversation: it persists the event via `EventLog.append`
and incrementally updates the cached `View` so the two stay in
sync. Callers (`_default_callback`, fork, etc.) must hold the
conversation lock; see callers in `LocalConversation` for the
canonical pattern.

The incremental view update is O(1) for the common
`LLMConvertibleEvent` case and O(view-size) only when a
`Condensation` event is applied. It does not run
`enforce_properties`; that fallback path runs only via
`rebuild_view`.
"""
self._events.append(event)
self._view.append_event(event)

def rebuild_view(self) -> None:
"""Re-derive the cached view from the full event log.

Runs `View.from_events` over the persisted log, which applies all
view property enforcement. This is the fallback path described in
`ViewPropertyBase` and should be used only on:

- Cold load (resuming a persisted `ConversationState`), since the
on-disk events may have been written by an older version or
could be corrupted.
- Fork creation, after deep-copying events from the source state.
- Explicit error recovery (e.g., when the LLM rejects the current
message history as malformed and we want to re-validate the
view before retrying with condensation).
"""
self._view = View.from_events(self._events)

@property
def env_observation_persistence_dir(self) -> str | None:
"""Directory for persisting environment observation files."""
Expand Down Expand Up @@ -355,6 +415,13 @@ def create(
state._events = EventLog(file_store, dir_path=EVENTS_DIR)
state._cipher = cipher

# Build the cached view from the persisted event log. This is the
# one place where we pay the full `View.from_events` cost (which
# runs property enforcement) — events may have been written by an
# older version of the code or otherwise be in a bad state, so
# treating this as a cold-load checkpoint is appropriate.
state.rebuild_view()

# Verify compatibility (agent class + tools)
agent.verify(state.agent, events=state._events)

Expand Down
8 changes: 4 additions & 4 deletions tests/agent_server/test_conversation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ async def test_second_service_does_not_resume_active_running_conversation(tmp_pa
primary_state = await primary_event_service.get_state()

running_action = _create_running_terminal_action()
primary_state.events.append(running_action)
primary_state.append_event(running_action)
primary_state.execution_status = ConversationExecutionStatus.RUNNING

async with ConversationService(
Expand All @@ -243,7 +243,7 @@ async def test_second_service_does_not_resume_active_running_conversation(tmp_pa
assert secondary._event_services is not None
assert conversation_info.id not in secondary._event_services

primary_state.events.append(
primary_state.append_event(
ObservationEvent(
observation=TerminalObservation.from_text(
"done",
Expand Down Expand Up @@ -284,7 +284,7 @@ async def test_stale_owner_cannot_append_after_lease_takeover(tmp_path):
primary_state = await primary_event_service.get_state()

running_action = _create_running_terminal_action()
primary_state.events.append(running_action)
primary_state.append_event(running_action)
primary_state.execution_status = ConversationExecutionStatus.RUNNING
_expire_conversation_lease(conversations_dir, conversation_info.id)

Expand All @@ -301,7 +301,7 @@ async def test_stale_owner_cannot_append_after_lease_takeover(tmp_path):
)

with pytest.raises(ConversationOwnershipLostError):
primary_state.events.append(
primary_state.append_event(
ObservationEvent(
observation=TerminalObservation.from_text(
"late result",
Expand Down
2 changes: 1 addition & 1 deletion tests/cross/test_agent_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def test_conversation_fails_when_used_tool_is_missing():
),
llm_response_id="test-response-1",
)
conversation.state.events.append(action_event)
conversation.state.append_event(action_event)

conversation_id = conversation.state.id
del conversation
Expand Down
Loading
Loading