From 91d20cba325c9ed9af689e2af3a1e520cd45a883 Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Wed, 20 May 2026 15:26:09 -0600 Subject: [PATCH 1/8] Lazy watermark-based View cache on ConversationState MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refs: #3053 Alternative to the push-based callback approach in #3324. Instead of eagerly syncing the cached View via EventLog._on_append callbacks, the View is lazily brought up to date on read using an integer watermark that tracks how many events have been incorporated. Changes: - state.py: _view, _view_watermark, view property, rebuild_view() - agent.py: rebuild_view() in step/astep malformed-history handlers - local_conversation.py: rebuild_view() after fork event copy - condenser/base.py: docstring read-only warning Files NOT changed (unlike #3324): - event_store.py: no _on_append callback, no set_on_append() - local_conversation.py: _default_callback unchanged, close() unchanged, condense() unchanged — no lock-wrapping changes needed Co-authored-by: openhands --- openhands-sdk/openhands/sdk/agent/agent.py | 7 + .../openhands/sdk/context/condenser/base.py | 5 + .../conversation/impl/local_conversation.py | 3 + .../openhands/sdk/conversation/state.py | 47 +++++ .../test_agent_context_window_condensation.py | 61 ++++++ .../sdk/conversation/test_state_view_cache.py | 196 ++++++++++++++++++ 6 files changed, 319 insertions(+) create mode 100644 tests/sdk/conversation/test_state_view_cache.py diff --git a/openhands-sdk/openhands/sdk/agent/agent.py b/openhands-sdk/openhands/sdk/agent/agent.py index efafbe69c9..85e900fa0f 100644 --- a/openhands-sdk/openhands/sdk/agent/agent.py +++ b/openhands-sdk/openhands/sdk/agent/agent.py @@ -634,6 +634,10 @@ def step( "triggering condensation retry with condensed history: " f"{e}" ) + # The incremental view may itself be the source of the + # malformed history. Re-derive with full enforcement so + # the condenser operates on a clean view. + state.rebuild_view() on_event(CondensationRequest()) return logger.warning( @@ -769,6 +773,9 @@ async def astep( "history: %s", e, ) + # Mirror step(): re-derive the cached view with full + # enforcement before the condensation retry. + state.rebuild_view() on_event(CondensationRequest()) return logger.warning( diff --git a/openhands-sdk/openhands/sdk/context/condenser/base.py b/openhands-sdk/openhands/sdk/context/condenser/base.py index 22333bb36b..6afed0b2c3 100644 --- a/openhands-sdk/openhands/sdk/context/condenser/base.py +++ b/openhands-sdk/openhands/sdk/context/condenser/base.py @@ -39,6 +39,11 @@ def condense(self, view: View, agent_llm: LLM | None = None) -> View | Condensat Args: view: A view of the history containing all events that should be condensed. + **Implementations must treat this view as read-only.** The view may be + a cached projection owned by ``ConversationState`` + (see https://github.com/OpenHands/software-agent-sdk/issues/3053), and + mutating it in place will corrupt that cache. Return a new ``View`` + (e.g. ``View(events=view.events[k:])``) or a ``Condensation`` instead. agent_llm: LLM instance used by the agent. Condensers use this for token counting purposes. Defaults to None. diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index fd65dca465..506d0da394 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -407,6 +407,9 @@ def fork( # immutable. for event in self._state.events: fork_conv._state.events.append(event.model_copy(deep=True)) + # Full rebuild: the copied events may need property enforcement + # (same posture as cold load). + fork_conv._state.rebuild_view() # Copy runtime state that accumulated during the source # conversation. activated_knowledge_skills is list[str] – strings diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 7e137929f2..4f64aa65cf 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -9,6 +9,7 @@ from pydantic import Field, PrivateAttr from openhands.sdk.agent.base import AgentBase +from openhands.sdk.context.view import View from openhands.sdk.conversation.conversation_stats import ConversationStats from openhands.sdk.conversation.event_store import EventLog from openhands.sdk.conversation.fifo_lock import FIFOLock @@ -205,6 +206,11 @@ class ConversationState(OpenHandsModel): # ===== Private attrs (NOT Fields) ===== _fs: FileStore = PrivateAttr() # filestore for persistence _events: EventLog = PrivateAttr() # now the storage for events + # Cached projection of `_events`, lazily updated on read via a + # watermark. Derived state — never persisted, never serialized. + # See https://github.com/OpenHands/software-agent-sdk/issues/3053. + _view: View = PrivateAttr(default_factory=View) + _view_watermark: int = PrivateAttr(default=0) _cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption _autosave_enabled: bool = PrivateAttr( default=False @@ -225,6 +231,42 @@ class ConversationState(OpenHandsModel): def events(self) -> EventLog: return self._events + @property + def view(self) -> View: + """Lazily-updated, incrementally-maintained ``View`` of the events. + + The view is brought up to date by replaying only the events + appended since the last read (tracked by an internal watermark). + This is O(k) where k is the number of new events — typically 2–4 + per agent step — rather than O(n) over the entire history. + + ``enforce_properties`` is *not* run on the incremental path. + Full enforcement happens only via ``rebuild_view()``, which is + called on cold load, fork, and error recovery. + + Callers must treat the returned view as read-only. + """ + n = len(self._events) + if n > self._view_watermark: + for i in range(self._view_watermark, n): + self._view.append_event(self._events[i]) + self._view_watermark = n + return self._view + + def rebuild_view(self) -> None: + """Re-derive the cached view from the full event log. + + Runs ``View.from_events`` which applies all view-property + enforcement. This is the recovery / cold-load path described + in ``ViewPropertyBase`` and should be called only on: + + - Cold load (resuming a persisted ``ConversationState``). + - Fork creation, after deep-copying events from the source. + - Explicit error recovery (e.g. malformed-history retry). + """ + self._view = View.from_events(self._events) + self._view_watermark = len(self._events) + @property def env_observation_persistence_dir(self) -> str | None: """Directory for persisting environment observation files.""" @@ -355,6 +397,11 @@ def create( state._events = EventLog(file_store, dir_path=EVENTS_DIR) state._cipher = cipher + # Cold-load: rebuild the cached view with full property + # enforcement — persisted events may come from an older code + # version or be corrupted. + state.rebuild_view() + # Verify compatibility (agent class + tools) agent.verify(state.agent, events=state._events) diff --git a/tests/sdk/agent/test_agent_context_window_condensation.py b/tests/sdk/agent/test_agent_context_window_condensation.py index 28beda25d1..c15442389c 100644 --- a/tests/sdk/agent/test_agent_context_window_condensation.py +++ b/tests/sdk/agent/test_agent_context_window_condensation.py @@ -52,12 +52,24 @@ def completion(self, *, messages, tools=None, **kwargs): # type: ignore[overrid "immediately after" ) + async def acompletion(self, *, messages, tools=None, **kwargs): # type: ignore[override] + raise LLMMalformedConversationHistoryError( + "messages.134: `tool_use` ids were found without `tool_result` blocks " + "immediately after" + ) + def responses(self, *, messages, tools=None, **kwargs): # type: ignore[override] raise LLMMalformedConversationHistoryError( "messages.134: `tool_use` ids were found without `tool_result` blocks " "immediately after" ) + async def aresponses(self, *, messages, tools=None, **kwargs): # type: ignore[override] + raise LLMMalformedConversationHistoryError( + "messages.134: `tool_use` ids were found without `tool_result` blocks " + "immediately after" + ) + class HandlesRequestsCondenser(CondenserBase): def condense( @@ -178,6 +190,55 @@ def test_agent_logs_warning_when_no_condenser_on_ctx_exceeded( assert any("test-model" in record.message for record in caplog.records) +@pytest.mark.parametrize("force_responses", [True, False]) +def test_agent_rebuilds_view_on_malformed_history_recovery( + force_responses: bool, +): + """rebuild_view is called before CondensationRequest on malformed history.""" + from unittest.mock import patch + + llm = MalformedHistoryRaisingLLM(force_responses=force_responses) + agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) + convo = Conversation(agent=agent) + convo._ensure_agent_ready() + + seen: list = [] + with patch.object( + type(convo._state), + "rebuild_view", + wraps=convo._state.rebuild_view, + ) as mock_rebuild: + agent.step(convo, on_event=lambda e: seen.append(e)) + assert mock_rebuild.call_count == 1 + + assert any(isinstance(e, CondensationRequest) for e in seen) + + +@pytest.mark.parametrize("force_responses", [True, False]) +@pytest.mark.asyncio +async def test_agent_rebuilds_view_on_malformed_history_recovery_async( + force_responses: bool, +): + """Async parity: astep calls rebuild_view before condensation retry.""" + from unittest.mock import patch + + llm = MalformedHistoryRaisingLLM(force_responses=force_responses) + agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) + convo = Conversation(agent=agent) + convo._ensure_agent_ready() + + seen: list = [] + with patch.object( + type(convo._state), + "rebuild_view", + wraps=convo._state.rebuild_view, + ) as mock_rebuild: + await agent.astep(convo, on_event=lambda e: seen.append(e)) + assert mock_rebuild.call_count == 1 + + assert any(isinstance(e, CondensationRequest) for e in seen) + + class NoHandlesRequestsCondenser(CondenserBase): """A condenser that doesn't handle condensation requests.""" diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py new file mode 100644 index 0000000000..b9acc21682 --- /dev/null +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -0,0 +1,196 @@ +"""Tests for the lazily-updated, watermark-based View cache on ConversationState.""" + +import uuid +from unittest.mock import patch + +import pytest +from pydantic import SecretStr + +from openhands.sdk.agent import Agent +from openhands.sdk.context.view import View +from openhands.sdk.conversation.state import ConversationState +from openhands.sdk.event.condenser import Condensation, CondensationRequest +from openhands.sdk.event.llm_convertible import MessageEvent +from openhands.sdk.llm import LLM +from openhands.sdk.llm.message import Message, TextContent +from openhands.sdk.workspace import LocalWorkspace + + +@pytest.fixture +def state(tmp_path): + """Create a minimal ConversationState backed by a temp directory.""" + llm = LLM(model="gpt-4o-mini", api_key=SecretStr("k"), usage_id="test") + agent = Agent(llm=llm, tools=[]) + working_dir = tmp_path / "work" + working_dir.mkdir() + return ConversationState.create( + id=uuid.uuid4(), + agent=agent, + workspace=LocalWorkspace(working_dir=str(working_dir)), + persistence_dir=str(tmp_path / "persist"), + ) + + +def _msg(text: str) -> MessageEvent: + return MessageEvent( + llm_message=Message(role="user", content=[TextContent(text=text)]), + source="user", + ) + + +def test_fresh_state_has_empty_view(state): + view = state.view + assert isinstance(view, View) + assert len(view.events) == 0 + + +def test_view_updates_after_event_append(state): + m = _msg("hello") + state.events.append(m) + assert len(state.view.events) == 1 + assert state.view.events[0].id == m.id + + +def test_view_tracks_multiple_appends(state): + msgs = [_msg(f"msg-{i}") for i in range(5)] + for m in msgs: + state.events.append(m) + assert len(state.view.events) == 5 + assert [e.id for e in state.view.events] == [m.id for m in msgs] + + +def test_view_matches_full_rebuild(state): + """Incremental view must produce the same result as View.from_events.""" + for i in range(10): + state.events.append(_msg(f"msg-{i}")) + incremental_ids = [e.id for e in state.view.events] + full_ids = [e.id for e in View.from_events(state.events).events] + assert incremental_ids == full_ids + + +def test_condensation_applied_incrementally(state): + m1 = _msg("first") + m2 = _msg("second") + m3 = _msg("third") + state.events.append(m1) + state.events.append(m2) + state.events.append(m3) + assert len(state.view.events) == 3 + + condensation = Condensation( + forgotten_event_ids={m1.id}, + summary="dropped first", + llm_response_id="test-resp", + ) + state.events.append(condensation) + # View should reflect the condensation: m1 removed + assert len(state.view.events) == 2 + remaining_ids = {e.id for e in state.view.events} + assert m1.id not in remaining_ids + assert m2.id in remaining_ids + assert m3.id in remaining_ids + + +def test_condensation_request_sets_flag(state): + state.events.append(_msg("x")) + state.events.append(CondensationRequest()) + assert state.view.unhandled_condensation_request is True + + +def test_hot_path_does_not_call_enforce_properties(state): + """Normal incremental appends must never invoke enforce_properties.""" + call_count = 0 + original = View.enforce_properties + + def counting_enforce(self, all_events): + nonlocal call_count + call_count += 1 + return original(self, all_events) + + with patch.object(View, "enforce_properties", counting_enforce): + for i in range(10): + state.events.append(_msg(f"msg-{i}")) + _ = state.view # force lazy catch-up + + assert call_count == 0 + + +def test_rebuild_view_runs_enforce_properties(state): + """rebuild_view must invoke enforce_properties (via View.from_events).""" + state.events.append(_msg("a")) + call_count = 0 + original = View.enforce_properties + + def counting_enforce(self, all_events): + nonlocal call_count + call_count += 1 + return original(self, all_events) + + with patch.object(View, "enforce_properties", counting_enforce): + state.rebuild_view() + + assert call_count >= 1 + + +def test_rebuild_view_replaces_cached_instance(state): + state.events.append(_msg("a")) + _ = state.view # populate cache + old_view = state.view + + state.rebuild_view() + assert state.view is not old_view + assert len(state.view.events) == 1 + + +def test_rebuild_view_matches_from_events(state): + for i in range(5): + state.events.append(_msg(f"msg-{i}")) + state.events.append( + Condensation( + forgotten_event_ids={state.events[0].id}, + summary="drop", + llm_response_id="test-resp", + ) + ) + state.rebuild_view() + rebuilt_ids = [e.id for e in state.view.events] + fresh_ids = [e.id for e in View.from_events(state.events).events] + assert rebuilt_ids == fresh_ids + + +def test_view_idempotent_on_repeated_reads(state): + """Reading the view multiple times without new events is a no-op.""" + state.events.append(_msg("x")) + v1 = state.view + v2 = state.view + assert v1 is v2 + assert len(v1.events) == 1 + + +def test_resume_path_rebuilds_view(tmp_path): + """Resuming a persisted ConversationState cold-loads the view.""" + llm = LLM(model="gpt-4o-mini", api_key=SecretStr("k"), usage_id="test") + agent = Agent(llm=llm, tools=[]) + working_dir = tmp_path / "work" + working_dir.mkdir() + persist = str(tmp_path / "persist") + + # Create and populate + state = ConversationState.create( + id=uuid.uuid4(), + agent=agent, + workspace=LocalWorkspace(working_dir=str(working_dir)), + persistence_dir=persist, + ) + state.events.append(_msg("persisted-msg")) + cid = state.id + + # Resume into a fresh state + resumed = ConversationState.create( + id=cid, + agent=agent, + workspace=LocalWorkspace(working_dir=str(working_dir)), + persistence_dir=persist, + ) + assert len(resumed.view.events) == 1 + assert resumed.view.events[0].id == state.events[0].id From eb08421c81f8aae487559a4a8255035b471f95ed Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Thu, 21 May 2026 07:57:16 -0600 Subject: [PATCH 2/8] Address review: lock view cache, stale-ref docstring, test extension - Guard view property and rebuild_view() with a threading.Lock so concurrent readers cannot double-append into the cached View. - Document in rebuild_view() that previously returned View references are invalidated after the call. - Extend test_resume_path_rebuilds_view to verify the watermark is correctly positioned after resume by appending an event and checking it appears incrementally. Co-authored-by: openhands --- .../openhands/sdk/conversation/state.py | 24 ++++++++++++------- .../sdk/conversation/test_state_view_cache.py | 5 ++++ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 4f64aa65cf..308dff656a 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -1,5 +1,6 @@ # state.py import json +import threading from collections.abc import Callable, Sequence from contextlib import AbstractContextManager from enum import Enum @@ -211,6 +212,7 @@ class ConversationState(OpenHandsModel): # See https://github.com/OpenHands/software-agent-sdk/issues/3053. _view: View = PrivateAttr(default_factory=View) _view_watermark: int = PrivateAttr(default=0) + _view_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) _cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption _autosave_enabled: bool = PrivateAttr( default=False @@ -246,12 +248,13 @@ def view(self) -> View: Callers must treat the returned view as read-only. """ - n = len(self._events) - if n > self._view_watermark: - for i in range(self._view_watermark, n): - self._view.append_event(self._events[i]) - self._view_watermark = n - return self._view + with self._view_lock: + n = len(self._events) + if n > self._view_watermark: + for i in range(self._view_watermark, n): + self._view.append_event(self._events[i]) + self._view_watermark = n + return self._view def rebuild_view(self) -> None: """Re-derive the cached view from the full event log. @@ -263,9 +266,14 @@ def rebuild_view(self) -> None: - Cold load (resuming a persisted ``ConversationState``). - Fork creation, after deep-copying events from the source. - Explicit error recovery (e.g. malformed-history retry). + + Any ``View`` reference previously returned by ``state.view`` + is invalidated after this call and must not be used — it + will never reflect new events or the rebuilt state. """ - self._view = View.from_events(self._events) - self._view_watermark = len(self._events) + with self._view_lock: + self._view = View.from_events(self._events) + self._view_watermark = len(self._events) @property def env_observation_persistence_dir(self) -> str | None: diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py index b9acc21682..648756e970 100644 --- a/tests/sdk/conversation/test_state_view_cache.py +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -194,3 +194,8 @@ def test_resume_path_rebuilds_view(tmp_path): ) assert len(resumed.view.events) == 1 assert resumed.view.events[0].id == state.events[0].id + + # Verify the watermark is correctly set so incremental reads work + # post-resume (guards against rebuild_view leaving watermark at 0). + resumed.events.append(_msg("after-resume")) + assert len(resumed.view.events) == 2 From 20f74d3a98e6e3407f64fd1b38d4f7839b22677c Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Thu, 21 May 2026 10:23:07 -0600 Subject: [PATCH 3/8] Review round 2: RLock, snapshot rebuild, hoist test import - Switch _view_lock from threading.Lock to threading.RLock so future reentrant paths (e.g. ViewProperty touching state.view) cannot deadlock silently. - Snapshot events in rebuild_view() before building the View so the watermark is tied to exactly the events that were materialized, closing a theoretical TOCTOU race with concurrent appenders. - Move 'from unittest.mock import patch' to module-level in test_agent_context_window_condensation.py (was duplicated in two function bodies with no circular-import justification). Co-authored-by: openhands --- openhands-sdk/openhands/sdk/conversation/state.py | 7 ++++--- tests/sdk/agent/test_agent_context_window_condensation.py | 5 +---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 308dff656a..c57b84a357 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -212,7 +212,7 @@ class ConversationState(OpenHandsModel): # See https://github.com/OpenHands/software-agent-sdk/issues/3053. _view: View = PrivateAttr(default_factory=View) _view_watermark: int = PrivateAttr(default=0) - _view_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) + _view_lock: threading.RLock = PrivateAttr(default_factory=threading.RLock) _cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption _autosave_enabled: bool = PrivateAttr( default=False @@ -272,8 +272,9 @@ def rebuild_view(self) -> None: will never reflect new events or the rebuilt state. """ with self._view_lock: - self._view = View.from_events(self._events) - self._view_watermark = len(self._events) + snapshot = list(self._events) + self._view = View.from_events(snapshot) + self._view_watermark = len(snapshot) @property def env_observation_persistence_dir(self) -> str | None: diff --git a/tests/sdk/agent/test_agent_context_window_condensation.py b/tests/sdk/agent/test_agent_context_window_condensation.py index c15442389c..ccf8d9ae59 100644 --- a/tests/sdk/agent/test_agent_context_window_condensation.py +++ b/tests/sdk/agent/test_agent_context_window_condensation.py @@ -1,4 +1,5 @@ from typing import TYPE_CHECKING +from unittest.mock import patch import pytest from pydantic import PrivateAttr @@ -195,8 +196,6 @@ def test_agent_rebuilds_view_on_malformed_history_recovery( force_responses: bool, ): """rebuild_view is called before CondensationRequest on malformed history.""" - from unittest.mock import patch - llm = MalformedHistoryRaisingLLM(force_responses=force_responses) agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) convo = Conversation(agent=agent) @@ -220,8 +219,6 @@ async def test_agent_rebuilds_view_on_malformed_history_recovery_async( force_responses: bool, ): """Async parity: astep calls rebuild_view before condensation retry.""" - from unittest.mock import patch - llm = MalformedHistoryRaisingLLM(force_responses=force_responses) agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) convo = Conversation(agent=agent) From 26711fff417a690c3c73f7936e11e1af232d27b4 Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Thu, 21 May 2026 15:21:38 -0600 Subject: [PATCH 4/8] Review round 3: per-event watermark advance, rebuild_view exception docstring - Advance _view_watermark after each successful append_event in the view property loop, so a mid-loop failure cannot leave already- appended events eligible for replay on the next read. - Document the implicit exception-safety of rebuild_view(): if View.from_events raises, the cache is left unchanged. - Decline instance-level patch.object suggestion: Pydantic's __setattr__ prevents setting arbitrary attributes on model instances, so class-level patching is the correct approach here. Co-authored-by: openhands --- openhands-sdk/openhands/sdk/conversation/state.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index c57b84a357..9674cba50a 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -250,10 +250,9 @@ def view(self) -> View: """ with self._view_lock: n = len(self._events) - if n > self._view_watermark: - for i in range(self._view_watermark, n): - self._view.append_event(self._events[i]) - self._view_watermark = n + for i in range(self._view_watermark, n): + self._view.append_event(self._events[i]) + self._view_watermark = i + 1 return self._view def rebuild_view(self) -> None: @@ -270,6 +269,11 @@ def rebuild_view(self) -> None: Any ``View`` reference previously returned by ``state.view`` is invalidated after this call and must not be used — it will never reflect new events or the rebuilt state. + + If ``View.from_events`` raises (e.g. due to corrupted events), + the cache is left unchanged and the exception propagates to + the caller. ``state.view`` continues to serve the pre-rebuild + state until a successful ``rebuild_view()`` call. """ with self._view_lock: snapshot = list(self._events) From 34ab59189a89c11778e4e071d132e83362795219 Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Fri, 22 May 2026 09:57:42 -0600 Subject: [PATCH 5/8] Add incremental-vs-full parity test with Condensation Covers the gap where test_view_matches_full_rebuild only used MessageEvents. The new test asserts state.view == View.from_events on a mixed sequence that includes a Condensation event. Co-authored-by: openhands --- tests/sdk/conversation/test_state_view_cache.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py index 648756e970..373aca5d63 100644 --- a/tests/sdk/conversation/test_state_view_cache.py +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -68,6 +68,21 @@ def test_view_matches_full_rebuild(state): assert incremental_ids == full_ids +def test_view_matches_full_rebuild_with_condensation(state): + """Parity holds on sequences that include a Condensation.""" + for i in range(5): + state.events.append(_msg(f"msg-{i}")) + condensation = Condensation( + forgotten_event_ids={state.events[0].id}, + summary="drop first", + llm_response_id="test-resp", + ) + state.events.append(condensation) + incremental_ids = [e.id for e in state.view.events] + full_ids = [e.id for e in View.from_events(state.events).events] + assert incremental_ids == full_ids + + def test_condensation_applied_incrementally(state): m1 = _msg("first") m2 = _msg("second") From a6ea6c65e552ac866562a587527f00fbe3714b5b Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Fri, 22 May 2026 10:32:35 -0600 Subject: [PATCH 6/8] Drop unnecessary list snapshot in rebuild_view The snapshot was added to guard a theoretical TOCTOU race between from_events iteration and watermark assignment, but rebuild_view holds the view lock and event appends happen on the same thread. Passing self._events directly is simpler and avoids the copy. Co-authored-by: openhands --- openhands-sdk/openhands/sdk/conversation/state.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 9674cba50a..3f4440e876 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -276,9 +276,8 @@ def rebuild_view(self) -> None: state until a successful ``rebuild_view()`` call. """ with self._view_lock: - snapshot = list(self._events) - self._view = View.from_events(snapshot) - self._view_watermark = len(snapshot) + self._view = View.from_events(self._events) + self._view_watermark = len(self._events) @property def env_observation_persistence_dir(self) -> str | None: From f550790d58668265101797a00baa4293cdd198bc Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Fri, 22 May 2026 12:42:50 -0600 Subject: [PATCH 7/8] Self-healing view fallback, docstring cross-ref, exception-safety test - view property: catch append_event failures and fall back to a full rebuild via View.from_events, so callers never see a stuck view. - view property docstring: cross-reference rebuild_view() invalidation (relevant once #3328 lands and callers hold view references across condensation cycles). - Add test_rebuild_view_leaves_cache_unchanged_on_error: verifies the documented contract that a failed rebuild preserves the old cache. Co-authored-by: openhands --- .../openhands/sdk/conversation/state.py | 16 +++++++++++++--- tests/sdk/conversation/test_state_view_cache.py | 15 +++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 3f4440e876..56301864d6 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -246,13 +246,23 @@ def view(self) -> View: Full enforcement happens only via ``rebuild_view()``, which is called on cold load, fork, and error recovery. - Callers must treat the returned view as read-only. + Callers must treat the returned view as read-only. This + reference is also invalidated by any call to ``rebuild_view()``; + re-read ``state.view`` after any rebuild if you need a fresh + snapshot. """ with self._view_lock: n = len(self._events) for i in range(self._view_watermark, n): - self._view.append_event(self._events[i]) - self._view_watermark = i + 1 + try: + self._view.append_event(self._events[i]) + self._view_watermark = i + 1 + except Exception: + # Incremental append failed; rebuild from scratch + # with full enforcement to recover a consistent view. + self._view = View.from_events(self._events) + self._view_watermark = len(self._events) + break return self._view def rebuild_view(self) -> None: diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py index 373aca5d63..6295aaa1bb 100644 --- a/tests/sdk/conversation/test_state_view_cache.py +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -214,3 +214,18 @@ def test_resume_path_rebuilds_view(tmp_path): # post-resume (guards against rebuild_view leaving watermark at 0). resumed.events.append(_msg("after-resume")) assert len(resumed.view.events) == 2 + + +def test_rebuild_view_leaves_cache_unchanged_on_error(state): + """If View.from_events raises, the old cache and watermark are preserved.""" + state.events.append(_msg("pre-error")) + _ = state.view # populate the incremental cache + old_view = state._view + old_watermark = state._view_watermark + + with patch.object(View, "from_events", side_effect=RuntimeError("boom")): + with pytest.raises(RuntimeError, match="boom"): + state.rebuild_view() + + assert state._view is old_view + assert state._view_watermark == old_watermark From 86d75dfa653c55dbc7ca7133abbb985734f5c9ba Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Fri, 22 May 2026 12:54:20 -0600 Subject: [PATCH 8/8] Log warning on self-healing view rebuild fallback Without this, the bare except silently swallows the root cause, making unexpected append_event failures undiagnosable in production. Co-authored-by: openhands --- openhands-sdk/openhands/sdk/conversation/state.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 56301864d6..ea14d21910 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -258,8 +258,12 @@ def view(self) -> View: self._view.append_event(self._events[i]) self._view_watermark = i + 1 except Exception: - # Incremental append failed; rebuild from scratch - # with full enforcement to recover a consistent view. + logger.warning( + "Incremental view append failed at index %d; " + "rebuilding from scratch.", + i, + exc_info=True, + ) self._view = View.from_events(self._events) self._view_watermark = len(self._events) break