diff --git a/openhands-sdk/openhands/sdk/agent/agent.py b/openhands-sdk/openhands/sdk/agent/agent.py index efafbe69c9..85e900fa0f 100644 --- a/openhands-sdk/openhands/sdk/agent/agent.py +++ b/openhands-sdk/openhands/sdk/agent/agent.py @@ -634,6 +634,10 @@ def step( "triggering condensation retry with condensed history: " f"{e}" ) + # The incremental view may itself be the source of the + # malformed history. Re-derive with full enforcement so + # the condenser operates on a clean view. + state.rebuild_view() on_event(CondensationRequest()) return logger.warning( @@ -769,6 +773,9 @@ async def astep( "history: %s", e, ) + # Mirror step(): re-derive the cached view with full + # enforcement before the condensation retry. + state.rebuild_view() on_event(CondensationRequest()) return logger.warning( diff --git a/openhands-sdk/openhands/sdk/context/condenser/base.py b/openhands-sdk/openhands/sdk/context/condenser/base.py index 22333bb36b..6afed0b2c3 100644 --- a/openhands-sdk/openhands/sdk/context/condenser/base.py +++ b/openhands-sdk/openhands/sdk/context/condenser/base.py @@ -39,6 +39,11 @@ def condense(self, view: View, agent_llm: LLM | None = None) -> View | Condensat Args: view: A view of the history containing all events that should be condensed. + **Implementations must treat this view as read-only.** The view may be + a cached projection owned by ``ConversationState`` + (see https://github.com/OpenHands/software-agent-sdk/issues/3053), and + mutating it in place will corrupt that cache. Return a new ``View`` + (e.g. ``View(events=view.events[k:])``) or a ``Condensation`` instead. agent_llm: LLM instance used by the agent. Condensers use this for token counting purposes. Defaults to None. diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index a6bb1ec60e..a1da4bb588 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -408,6 +408,9 @@ def fork( # immutable. for event in self._state.events: fork_conv._state.events.append(event.model_copy(deep=True)) + # Full rebuild: the copied events may need property enforcement + # (same posture as cold load). + fork_conv._state.rebuild_view() # Copy runtime state that accumulated during the source # conversation. activated_knowledge_skills is list[str] – strings diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 7e137929f2..ea14d21910 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -1,5 +1,6 @@ # state.py import json +import threading from collections.abc import Callable, Sequence from contextlib import AbstractContextManager from enum import Enum @@ -9,6 +10,7 @@ from pydantic import Field, PrivateAttr from openhands.sdk.agent.base import AgentBase +from openhands.sdk.context.view import View from openhands.sdk.conversation.conversation_stats import ConversationStats from openhands.sdk.conversation.event_store import EventLog from openhands.sdk.conversation.fifo_lock import FIFOLock @@ -205,6 +207,12 @@ class ConversationState(OpenHandsModel): # ===== Private attrs (NOT Fields) ===== _fs: FileStore = PrivateAttr() # filestore for persistence _events: EventLog = PrivateAttr() # now the storage for events + # Cached projection of `_events`, lazily updated on read via a + # watermark. Derived state — never persisted, never serialized. + # See https://github.com/OpenHands/software-agent-sdk/issues/3053. + _view: View = PrivateAttr(default_factory=View) + _view_watermark: int = PrivateAttr(default=0) + _view_lock: threading.RLock = PrivateAttr(default_factory=threading.RLock) _cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption _autosave_enabled: bool = PrivateAttr( default=False @@ -225,6 +233,66 @@ class ConversationState(OpenHandsModel): def events(self) -> EventLog: return self._events + @property + def view(self) -> View: + """Lazily-updated, incrementally-maintained ``View`` of the events. + + The view is brought up to date by replaying only the events + appended since the last read (tracked by an internal watermark). + This is O(k) where k is the number of new events — typically 2–4 + per agent step — rather than O(n) over the entire history. + + ``enforce_properties`` is *not* run on the incremental path. + Full enforcement happens only via ``rebuild_view()``, which is + called on cold load, fork, and error recovery. + + Callers must treat the returned view as read-only. This + reference is also invalidated by any call to ``rebuild_view()``; + re-read ``state.view`` after any rebuild if you need a fresh + snapshot. + """ + with self._view_lock: + n = len(self._events) + for i in range(self._view_watermark, n): + try: + self._view.append_event(self._events[i]) + self._view_watermark = i + 1 + except Exception: + logger.warning( + "Incremental view append failed at index %d; " + "rebuilding from scratch.", + i, + exc_info=True, + ) + self._view = View.from_events(self._events) + self._view_watermark = len(self._events) + break + return self._view + + def rebuild_view(self) -> None: + """Re-derive the cached view from the full event log. + + Runs ``View.from_events`` which applies all view-property + enforcement. This is the recovery / cold-load path described + in ``ViewPropertyBase`` and should be called only on: + + - Cold load (resuming a persisted ``ConversationState``). + - Fork creation, after deep-copying events from the source. + - Explicit error recovery (e.g. malformed-history retry). + + Any ``View`` reference previously returned by ``state.view`` + is invalidated after this call and must not be used — it + will never reflect new events or the rebuilt state. + + If ``View.from_events`` raises (e.g. due to corrupted events), + the cache is left unchanged and the exception propagates to + the caller. ``state.view`` continues to serve the pre-rebuild + state until a successful ``rebuild_view()`` call. + """ + with self._view_lock: + self._view = View.from_events(self._events) + self._view_watermark = len(self._events) + @property def env_observation_persistence_dir(self) -> str | None: """Directory for persisting environment observation files.""" @@ -355,6 +423,11 @@ def create( state._events = EventLog(file_store, dir_path=EVENTS_DIR) state._cipher = cipher + # Cold-load: rebuild the cached view with full property + # enforcement — persisted events may come from an older code + # version or be corrupted. + state.rebuild_view() + # Verify compatibility (agent class + tools) agent.verify(state.agent, events=state._events) diff --git a/tests/sdk/agent/test_agent_context_window_condensation.py b/tests/sdk/agent/test_agent_context_window_condensation.py index 28beda25d1..ccf8d9ae59 100644 --- a/tests/sdk/agent/test_agent_context_window_condensation.py +++ b/tests/sdk/agent/test_agent_context_window_condensation.py @@ -1,4 +1,5 @@ from typing import TYPE_CHECKING +from unittest.mock import patch import pytest from pydantic import PrivateAttr @@ -52,12 +53,24 @@ def completion(self, *, messages, tools=None, **kwargs): # type: ignore[overrid "immediately after" ) + async def acompletion(self, *, messages, tools=None, **kwargs): # type: ignore[override] + raise LLMMalformedConversationHistoryError( + "messages.134: `tool_use` ids were found without `tool_result` blocks " + "immediately after" + ) + def responses(self, *, messages, tools=None, **kwargs): # type: ignore[override] raise LLMMalformedConversationHistoryError( "messages.134: `tool_use` ids were found without `tool_result` blocks " "immediately after" ) + async def aresponses(self, *, messages, tools=None, **kwargs): # type: ignore[override] + raise LLMMalformedConversationHistoryError( + "messages.134: `tool_use` ids were found without `tool_result` blocks " + "immediately after" + ) + class HandlesRequestsCondenser(CondenserBase): def condense( @@ -178,6 +191,51 @@ def test_agent_logs_warning_when_no_condenser_on_ctx_exceeded( assert any("test-model" in record.message for record in caplog.records) +@pytest.mark.parametrize("force_responses", [True, False]) +def test_agent_rebuilds_view_on_malformed_history_recovery( + force_responses: bool, +): + """rebuild_view is called before CondensationRequest on malformed history.""" + llm = MalformedHistoryRaisingLLM(force_responses=force_responses) + agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) + convo = Conversation(agent=agent) + convo._ensure_agent_ready() + + seen: list = [] + with patch.object( + type(convo._state), + "rebuild_view", + wraps=convo._state.rebuild_view, + ) as mock_rebuild: + agent.step(convo, on_event=lambda e: seen.append(e)) + assert mock_rebuild.call_count == 1 + + assert any(isinstance(e, CondensationRequest) for e in seen) + + +@pytest.mark.parametrize("force_responses", [True, False]) +@pytest.mark.asyncio +async def test_agent_rebuilds_view_on_malformed_history_recovery_async( + force_responses: bool, +): + """Async parity: astep calls rebuild_view before condensation retry.""" + llm = MalformedHistoryRaisingLLM(force_responses=force_responses) + agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) + convo = Conversation(agent=agent) + convo._ensure_agent_ready() + + seen: list = [] + with patch.object( + type(convo._state), + "rebuild_view", + wraps=convo._state.rebuild_view, + ) as mock_rebuild: + await agent.astep(convo, on_event=lambda e: seen.append(e)) + assert mock_rebuild.call_count == 1 + + assert any(isinstance(e, CondensationRequest) for e in seen) + + class NoHandlesRequestsCondenser(CondenserBase): """A condenser that doesn't handle condensation requests.""" diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py new file mode 100644 index 0000000000..6295aaa1bb --- /dev/null +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -0,0 +1,231 @@ +"""Tests for the lazily-updated, watermark-based View cache on ConversationState.""" + +import uuid +from unittest.mock import patch + +import pytest +from pydantic import SecretStr + +from openhands.sdk.agent import Agent +from openhands.sdk.context.view import View +from openhands.sdk.conversation.state import ConversationState +from openhands.sdk.event.condenser import Condensation, CondensationRequest +from openhands.sdk.event.llm_convertible import MessageEvent +from openhands.sdk.llm import LLM +from openhands.sdk.llm.message import Message, TextContent +from openhands.sdk.workspace import LocalWorkspace + + +@pytest.fixture +def state(tmp_path): + """Create a minimal ConversationState backed by a temp directory.""" + llm = LLM(model="gpt-4o-mini", api_key=SecretStr("k"), usage_id="test") + agent = Agent(llm=llm, tools=[]) + working_dir = tmp_path / "work" + working_dir.mkdir() + return ConversationState.create( + id=uuid.uuid4(), + agent=agent, + workspace=LocalWorkspace(working_dir=str(working_dir)), + persistence_dir=str(tmp_path / "persist"), + ) + + +def _msg(text: str) -> MessageEvent: + return MessageEvent( + llm_message=Message(role="user", content=[TextContent(text=text)]), + source="user", + ) + + +def test_fresh_state_has_empty_view(state): + view = state.view + assert isinstance(view, View) + assert len(view.events) == 0 + + +def test_view_updates_after_event_append(state): + m = _msg("hello") + state.events.append(m) + assert len(state.view.events) == 1 + assert state.view.events[0].id == m.id + + +def test_view_tracks_multiple_appends(state): + msgs = [_msg(f"msg-{i}") for i in range(5)] + for m in msgs: + state.events.append(m) + assert len(state.view.events) == 5 + assert [e.id for e in state.view.events] == [m.id for m in msgs] + + +def test_view_matches_full_rebuild(state): + """Incremental view must produce the same result as View.from_events.""" + for i in range(10): + state.events.append(_msg(f"msg-{i}")) + incremental_ids = [e.id for e in state.view.events] + full_ids = [e.id for e in View.from_events(state.events).events] + assert incremental_ids == full_ids + + +def test_view_matches_full_rebuild_with_condensation(state): + """Parity holds on sequences that include a Condensation.""" + for i in range(5): + state.events.append(_msg(f"msg-{i}")) + condensation = Condensation( + forgotten_event_ids={state.events[0].id}, + summary="drop first", + llm_response_id="test-resp", + ) + state.events.append(condensation) + incremental_ids = [e.id for e in state.view.events] + full_ids = [e.id for e in View.from_events(state.events).events] + assert incremental_ids == full_ids + + +def test_condensation_applied_incrementally(state): + m1 = _msg("first") + m2 = _msg("second") + m3 = _msg("third") + state.events.append(m1) + state.events.append(m2) + state.events.append(m3) + assert len(state.view.events) == 3 + + condensation = Condensation( + forgotten_event_ids={m1.id}, + summary="dropped first", + llm_response_id="test-resp", + ) + state.events.append(condensation) + # View should reflect the condensation: m1 removed + assert len(state.view.events) == 2 + remaining_ids = {e.id for e in state.view.events} + assert m1.id not in remaining_ids + assert m2.id in remaining_ids + assert m3.id in remaining_ids + + +def test_condensation_request_sets_flag(state): + state.events.append(_msg("x")) + state.events.append(CondensationRequest()) + assert state.view.unhandled_condensation_request is True + + +def test_hot_path_does_not_call_enforce_properties(state): + """Normal incremental appends must never invoke enforce_properties.""" + call_count = 0 + original = View.enforce_properties + + def counting_enforce(self, all_events): + nonlocal call_count + call_count += 1 + return original(self, all_events) + + with patch.object(View, "enforce_properties", counting_enforce): + for i in range(10): + state.events.append(_msg(f"msg-{i}")) + _ = state.view # force lazy catch-up + + assert call_count == 0 + + +def test_rebuild_view_runs_enforce_properties(state): + """rebuild_view must invoke enforce_properties (via View.from_events).""" + state.events.append(_msg("a")) + call_count = 0 + original = View.enforce_properties + + def counting_enforce(self, all_events): + nonlocal call_count + call_count += 1 + return original(self, all_events) + + with patch.object(View, "enforce_properties", counting_enforce): + state.rebuild_view() + + assert call_count >= 1 + + +def test_rebuild_view_replaces_cached_instance(state): + state.events.append(_msg("a")) + _ = state.view # populate cache + old_view = state.view + + state.rebuild_view() + assert state.view is not old_view + assert len(state.view.events) == 1 + + +def test_rebuild_view_matches_from_events(state): + for i in range(5): + state.events.append(_msg(f"msg-{i}")) + state.events.append( + Condensation( + forgotten_event_ids={state.events[0].id}, + summary="drop", + llm_response_id="test-resp", + ) + ) + state.rebuild_view() + rebuilt_ids = [e.id for e in state.view.events] + fresh_ids = [e.id for e in View.from_events(state.events).events] + assert rebuilt_ids == fresh_ids + + +def test_view_idempotent_on_repeated_reads(state): + """Reading the view multiple times without new events is a no-op.""" + state.events.append(_msg("x")) + v1 = state.view + v2 = state.view + assert v1 is v2 + assert len(v1.events) == 1 + + +def test_resume_path_rebuilds_view(tmp_path): + """Resuming a persisted ConversationState cold-loads the view.""" + llm = LLM(model="gpt-4o-mini", api_key=SecretStr("k"), usage_id="test") + agent = Agent(llm=llm, tools=[]) + working_dir = tmp_path / "work" + working_dir.mkdir() + persist = str(tmp_path / "persist") + + # Create and populate + state = ConversationState.create( + id=uuid.uuid4(), + agent=agent, + workspace=LocalWorkspace(working_dir=str(working_dir)), + persistence_dir=persist, + ) + state.events.append(_msg("persisted-msg")) + cid = state.id + + # Resume into a fresh state + resumed = ConversationState.create( + id=cid, + agent=agent, + workspace=LocalWorkspace(working_dir=str(working_dir)), + persistence_dir=persist, + ) + assert len(resumed.view.events) == 1 + assert resumed.view.events[0].id == state.events[0].id + + # Verify the watermark is correctly set so incremental reads work + # post-resume (guards against rebuild_view leaving watermark at 0). + resumed.events.append(_msg("after-resume")) + assert len(resumed.view.events) == 2 + + +def test_rebuild_view_leaves_cache_unchanged_on_error(state): + """If View.from_events raises, the old cache and watermark are preserved.""" + state.events.append(_msg("pre-error")) + _ = state.view # populate the incremental cache + old_view = state._view + old_watermark = state._view_watermark + + with patch.object(View, "from_events", side_effect=RuntimeError("boom")): + with pytest.raises(RuntimeError, match="boom"): + state.rebuild_view() + + assert state._view is old_view + assert state._view_watermark == old_watermark