From c9b7e779d0d3f53e194998993d0115d7c74805ea Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 20 May 2026 13:35:31 +0000 Subject: [PATCH 1/6] Add cached, incrementally-maintained View on ConversationState Introduces `ConversationState.view` as a derived-but-cached projection of the event log, updated in lockstep with appends via the new `ConversationState.append_event` write path. The view is fully re-derived (running `View.enforce_properties`) only via `ConversationState.rebuild_view`, which is invoked on cold load (resuming from persistence) and after fork event-copy. Hot-path agent steps no longer need to pay the `View.from_events` cost on every iteration once call sites are migrated in a follow-up PR. The default callback in `LocalConversation` and the fork event-copy now go through `state.append_event`, keeping the cached view in sync without introducing any behavioral change for existing callers (who read events via `state.events`). Also documents on `CondenserBase.condense` that implementations must treat the passed view as read-only, since it may now be a cached projection owned by `ConversationState`. Refs: #3053 Co-authored-by: openhands --- .../openhands/sdk/context/condenser/base.py | 6 + .../conversation/impl/local_conversation.py | 14 +- .../openhands/sdk/conversation/state.py | 67 +++++++ .../sdk/conversation/test_state_view_cache.py | 166 ++++++++++++++++++ 4 files changed, 250 insertions(+), 3 deletions(-) create mode 100644 tests/sdk/conversation/test_state_view_cache.py diff --git a/openhands-sdk/openhands/sdk/context/condenser/base.py b/openhands-sdk/openhands/sdk/context/condenser/base.py index 4b80ec8b85..ca16503fce 100644 --- a/openhands-sdk/openhands/sdk/context/condenser/base.py +++ b/openhands-sdk/openhands/sdk/context/condenser/base.py @@ -39,6 +39,12 @@ def condense(self, view: View, agent_llm: LLM | None = None) -> View | Condensat Args: view: A view of the history containing all events that should be condensed. + **Implementations must treat this view as read-only.** The view may be + a cached projection owned by `ConversationState` + (see https://github.com/OpenHands/software-agent-sdk/issues/3053), and + mutating it in place will corrupt that cache and silently desynchronize + the conversation from its event log. Return a new `View` (e.g. + `View(events=view.events[k:])`) or a `Condensation` event instead. agent_llm: LLM instance used by the agent. Condensers use this for token counting purposes. Defaults to None. diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 0ab9dc1cb9..869959f7f8 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -198,7 +198,11 @@ def _default_callback(e): # This callback runs while holding the conversation state's lock # (see BaseConversation.compose_callbacks usage inside `with self._state:` # regions), so updating state here is thread-safe. - self._state.events.append(e) + # Using `append_event` (instead of `state.events.append`) also + # incrementally updates the cached `state.view`, keeping the view + # in sync without paying the O(n) `View.from_events` cost on each + # step. + self._state.append_event(e) # Track user MessageEvent IDs here so hook callbacks (which may # synthesize or alter user messages) are captured in one place. if isinstance(e, MessageEvent) and e.source == "user": @@ -383,9 +387,13 @@ def fork( ) # Deep-copy events from source → fork so the source stays - # immutable. + # immutable. Use `append_event` so the fork's cached view is + # built up incrementally alongside the event log, then call + # `rebuild_view` once at the end to run a full enforcement pass + # over the freshly-populated log (same posture as cold load). for event in self._state.events: - fork_conv._state.events.append(event.model_copy(deep=True)) + fork_conv._state.append_event(event.model_copy(deep=True)) + fork_conv._state.rebuild_view() # Copy runtime state that accumulated during the source # conversation. activated_knowledge_skills is list[str] – strings diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 7e137929f2..7b6d3f00e0 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -9,6 +9,7 @@ from pydantic import Field, PrivateAttr from openhands.sdk.agent.base import AgentBase +from openhands.sdk.context.view import View from openhands.sdk.conversation.conversation_stats import ConversationStats from openhands.sdk.conversation.event_store import EventLog from openhands.sdk.conversation.fifo_lock import FIFOLock @@ -205,6 +206,12 @@ class ConversationState(OpenHandsModel): # ===== Private attrs (NOT Fields) ===== _fs: FileStore = PrivateAttr() # filestore for persistence _events: EventLog = PrivateAttr() # now the storage for events + # Cached, incrementally-maintained projection of `_events`. Derived state, + # so it is intentionally not a model field and never persisted. Updated in + # `append_event` on every new event and fully re-derived (with property + # enforcement) via `rebuild_view` on cold load, fork, or error recovery. + # See https://github.com/OpenHands/software-agent-sdk/issues/3053. + _view: View = PrivateAttr(default_factory=View) _cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption _autosave_enabled: bool = PrivateAttr( default=False @@ -225,6 +232,59 @@ class ConversationState(OpenHandsModel): def events(self) -> EventLog: return self._events + @property + def view(self) -> View: + """Cached, incrementally-maintained `View` of the conversation events. + + The view is updated in lockstep with `events` via `append_event`, so it + always reflects the current `_events` log without paying the O(n) cost + of `View.from_events` on every read. It is fully re-derived (with + property enforcement) only by `rebuild_view`, which is intended for + cold load, fork, and explicit error recovery — not the per-step hot + path. + + Callers must treat the returned view as read-only. Mutating it + breaks the cache invariant and will cause silent divergence from + `events`. + """ + return self._view + + def append_event(self, event: Event) -> None: + """Append an event to the conversation, updating the cached view. + + This is the only sanctioned write path for adding events to a + running conversation: it persists the event via `EventLog.append` + and incrementally updates the cached `View` so the two stay in + sync. Callers (`_default_callback`, fork, etc.) must hold the + conversation lock; see callers in `LocalConversation` for the + canonical pattern. + + The incremental view update is O(1) for the common + `LLMConvertibleEvent` case and O(view-size) only when a + `Condensation` event is applied. It does not run + `enforce_properties`; that fallback path runs only via + `rebuild_view`. + """ + self._events.append(event) + self._view.append_event(event) + + def rebuild_view(self) -> None: + """Re-derive the cached view from the full event log. + + Runs `View.from_events` over the persisted log, which applies all + view property enforcement. This is the fallback path described in + `ViewPropertyBase` and should be used only on: + + - Cold load (resuming a persisted `ConversationState`), since the + on-disk events may have been written by an older version or + could be corrupted. + - Fork creation, after deep-copying events from the source state. + - Explicit error recovery (e.g., when the LLM rejects the current + message history as malformed and we want to re-validate the + view before retrying with condensation). + """ + self._view = View.from_events(self._events) + @property def env_observation_persistence_dir(self) -> str | None: """Directory for persisting environment observation files.""" @@ -355,6 +415,13 @@ def create( state._events = EventLog(file_store, dir_path=EVENTS_DIR) state._cipher = cipher + # Build the cached view from the persisted event log. This is the + # one place where we pay the full `View.from_events` cost (which + # runs property enforcement) — events may have been written by an + # older version of the code or otherwise be in a bad state, so + # treating this as a cold-load checkpoint is appropriate. + state.rebuild_view() + # Verify compatibility (agent class + tools) agent.verify(state.agent, events=state._events) diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py new file mode 100644 index 0000000000..61d6b51b56 --- /dev/null +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -0,0 +1,166 @@ +"""Tests for the cached `ConversationState.view` and the `append_event` / +`rebuild_view` write paths added for issue #3053. + +These tests assert that the incremental view stays in sync with the event log +without paying the cost of `View.from_events` on every append, and that the +full `enforce_properties` pass is reserved for explicit `rebuild_view` calls +(cold load, fork, error recovery). +""" + +from __future__ import annotations + +import uuid + +import pytest +from pydantic import SecretStr + +from openhands.sdk import LLM, Agent +from openhands.sdk.context.view import View +from openhands.sdk.conversation.event_store import EventLog +from openhands.sdk.conversation.state import ConversationState +from openhands.sdk.event.condenser import Condensation +from openhands.sdk.io import InMemoryFileStore +from openhands.sdk.workspace import LocalWorkspace +from tests.sdk.context.view.conftest import message_event + + +@pytest.fixture +def state() -> ConversationState: + """A bare ConversationState with an in-memory event log attached. + + We do not use `ConversationState.create` here because that path also + touches a LocalFileStore on disk; for these unit tests an in-memory + store is sufficient. + """ + llm = LLM(model="gpt-4o-mini", api_key=SecretStr("test-key"), usage_id="test-llm") + agent = Agent(llm=llm) + workspace = LocalWorkspace(working_dir="/tmp/test") + + state = ConversationState( + id=uuid.uuid4(), + workspace=workspace, + persistence_dir=None, + agent=agent, + ) + state._fs = InMemoryFileStore() + state._events = EventLog(state._fs) + return state + + +def test_fresh_state_has_empty_view(state: ConversationState) -> None: + assert len(state.view) == 0 + assert state.view.events == [] + + +def test_append_event_updates_both_event_log_and_view( + state: ConversationState, +) -> None: + msg = message_event("hello") + state.append_event(msg) + + # EventLog rehydrates from disk on read, so we compare by id rather than + # identity for the underlying log; the view holds direct references and + # can be compared with `is`. + assert len(state.events) == 1 + assert state.events[0].id == msg.id + assert len(state.view) == 1 + assert state.view.events[0] is msg + + +def test_view_stays_in_parity_with_from_events_after_many_appends( + state: ConversationState, +) -> None: + msgs = [message_event(f"msg {i}") for i in range(5)] + for msg in msgs: + state.append_event(msg) + + rebuilt = View.from_events(state.events) + assert [e.id for e in state.view.events] == [e.id for e in rebuilt.events] + assert ( + state.view.unhandled_condensation_request + == rebuilt.unhandled_condensation_request + ) + + +def test_condensation_event_is_applied_incrementally( + state: ConversationState, +) -> None: + msgs = [message_event(f"msg {i}") for i in range(3)] + for msg in msgs: + state.append_event(msg) + + condensation = Condensation( + forgotten_event_ids={msgs[0].id, msgs[2].id}, + llm_response_id="resp_1", + ) + state.append_event(condensation) + + # The Condensation is still in the underlying log (it is not LLM-convertible + # but is part of the persisted history); the view, however, should reflect + # the condensation by dropping the forgotten messages. + assert len(state.view) == 1 + assert state.view.events[0] is msgs[1] + + +def test_append_event_does_not_run_enforce_on_hot_path( + state: ConversationState, monkeypatch: pytest.MonkeyPatch +) -> None: + """The hot path must not pay the cost of `enforce_properties`. + + This is the core perf invariant from #3053: incremental updates rely on + manipulation indices keeping the view well-formed, so enforcement should + only run on `rebuild_view`. + """ + call_count = 0 + original_enforce = View.enforce_properties + + def counting_enforce(self: View, all_events): # type: ignore[no-untyped-def] + nonlocal call_count + call_count += 1 + return original_enforce(self, all_events) + + monkeypatch.setattr(View, "enforce_properties", counting_enforce) + + for i in range(10): + state.append_event(message_event(f"msg {i}")) + + assert call_count == 0, ( + "append_event must not invoke enforce_properties on the hot path" + ) + + +def test_rebuild_view_runs_enforce( + state: ConversationState, monkeypatch: pytest.MonkeyPatch +) -> None: + """rebuild_view is the one place where full property enforcement runs.""" + call_count = 0 + original_enforce = View.enforce_properties + + def counting_enforce(self: View, all_events): # type: ignore[no-untyped-def] + nonlocal call_count + call_count += 1 + return original_enforce(self, all_events) + + monkeypatch.setattr(View, "enforce_properties", counting_enforce) + + for i in range(3): + state.append_event(message_event(f"msg {i}")) + + # Sanity: no enforce so far. + assert call_count == 0 + + state.rebuild_view() + assert call_count >= 1 + # Parity check after the rebuild. + assert [e.id for e in state.view.events] == [e.id for e in state.events] + + +def test_rebuild_view_replaces_cached_instance(state: ConversationState) -> None: + """rebuild_view should produce a fresh View instance derived from the log.""" + state.append_event(message_event("hello")) + before = state.view + state.rebuild_view() + after = state.view + + assert before is not after + assert [e.id for e in after.events] == [e.id for e in before.events] From 0abd27c7a1fbcfded0cd22b75dd406ea8294f9ca Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 20 May 2026 13:46:26 +0000 Subject: [PATCH 2/6] Rebuild cached View on malformed-history recovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the LLM rejects the current message history as structurally malformed (broken tool_use/tool_result pairing, etc.), the incremental View we just sent may itself be the source of the problem — for example, a manipulation-indices bug that let a property slip past the inductive maintenance path. Re-derive the cached view from the event log with full `enforce_properties` before triggering the condensation retry, so the condenser operates on a clean, enforced view. This is one of the two explicit `rebuild_view` entry points described in the original design (the other being cold-load on `ConversationState.create`, which already exists in the previous commit on this branch). Refs: #3053 Co-authored-by: openhands --- openhands-sdk/openhands/sdk/agent/agent.py | 7 ++++ .../test_agent_context_window_condensation.py | 33 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/openhands-sdk/openhands/sdk/agent/agent.py b/openhands-sdk/openhands/sdk/agent/agent.py index d1f59d6ffd..d2d00f8450 100644 --- a/openhands-sdk/openhands/sdk/agent/agent.py +++ b/openhands-sdk/openhands/sdk/agent/agent.py @@ -554,6 +554,13 @@ def step( "triggering condensation retry with condensed history: " f"{e}" ) + # The incremental view we just sent may itself be the source of + # the malformed history (e.g., a manipulation-indices bug that + # let a property slip). Re-derive it from the event log with + # full `enforce_properties` before kicking off the condensation + # retry, so the condenser operates on a clean, enforced view. + # See https://github.com/OpenHands/software-agent-sdk/issues/3053. + state.rebuild_view() on_event(CondensationRequest()) return logger.warning( diff --git a/tests/sdk/agent/test_agent_context_window_condensation.py b/tests/sdk/agent/test_agent_context_window_condensation.py index 28beda25d1..6bff123096 100644 --- a/tests/sdk/agent/test_agent_context_window_condensation.py +++ b/tests/sdk/agent/test_agent_context_window_condensation.py @@ -118,6 +118,39 @@ def on_event(e): ) +def test_agent_rebuilds_view_on_malformed_history_recovery(monkeypatch): + """The malformed-history recovery path must re-derive the cached view. + + If the incremental view itself is what produced the malformed history + (e.g., a manipulation-indices bug let a property slip), the condensation + retry needs a freshly enforced view to work from. See #3053. + """ + llm = MalformedHistoryRaisingLLM() + agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) + convo = Conversation(agent=agent) + + convo._ensure_agent_ready() + + call_count = 0 + original_rebuild = type(convo.state).rebuild_view + + def counting_rebuild(self): + nonlocal call_count + call_count += 1 + return original_rebuild(self) + + monkeypatch.setattr(type(convo.state), "rebuild_view", counting_rebuild) + + seen = [] + agent.step(convo, on_event=seen.append) + + assert any(isinstance(e, CondensationRequest) for e in seen) + assert call_count == 1, ( + f"rebuild_view should be called exactly once on malformed-history " + f"recovery, got {call_count}" + ) + + @pytest.mark.parametrize("force_responses", [True, False]) def test_agent_raises_ctx_exceeded_when_no_condenser(force_responses: bool): llm = RaisingLLM(force_responses=force_responses) From ce1bc9c99483e931492c36152444797c7797aebd Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Wed, 20 May 2026 12:21:03 -0600 Subject: [PATCH 3/6] Address review: wire EventLog on_append callback for view sync, fix condense() race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread 1 – Guard against direct EventLog.append bypassing the view cache: Add _on_append callback to EventLog, invoked after every successful append with (event, synced_count). ConversationState._wire_view_sync() wires this callback so that both state.append_event() and direct state.events.append() keep the cached _view in sync. append_event() no longer calls _view.append_event() directly — the callback handles it uniformly. Thread 2 – Handle EventLog syncing extra events from disk: EventLog.append() now tracks how many events were synced from disk before writing the new event and passes this count to _on_append. When synced_count > 0, the callback performs a full View.from_events rebuild instead of an incremental update, so disk-synced events are not missed by the view. Thread 3 – Fix condense() race condition: Move _on_event(condensation_request) inside the `with self._state:` block in condense(), so the view mutation cannot race with a concurrent run(). Tests: - test_direct_eventlog_append_also_updates_view: verifies the on_append callback keeps the view in sync for direct EventLog.append calls. - test_view_rebuilds_when_eventlog_syncs_extra_events: simulates another writer and verifies the full rebuild path. Co-authored-by: openhands --- .../openhands/sdk/conversation/event_store.py | 20 ++++++ .../conversation/impl/local_conversation.py | 19 +++--- .../openhands/sdk/conversation/state.py | 68 +++++++++++++------ .../sdk/conversation/test_state_view_cache.py | 48 +++++++++++++ 4 files changed, 124 insertions(+), 31 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/event_store.py b/openhands-sdk/openhands/sdk/conversation/event_store.py index 9c3fa4e097..59e7b1b76d 100644 --- a/openhands-sdk/openhands/sdk/conversation/event_store.py +++ b/openhands-sdk/openhands/sdk/conversation/event_store.py @@ -40,6 +40,7 @@ class EventLog(EventsListBase): _length: int _lock_path: str _write_guard: Callable[[], AbstractContextManager[None]] | None + _on_append: Callable[[Event, int], None] | None def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None: self._fs = fs @@ -48,6 +49,7 @@ def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None: self._idx_to_id: dict[int, EventID] = {} self._lock_path = f"{dir_path}/{LOCK_FILE_NAME}" self._write_guard = None + self._on_append = None self._length = self._scan_and_build_index() def set_write_guard( @@ -56,6 +58,19 @@ def set_write_guard( ) -> None: self._write_guard = write_guard + def set_on_append( + self, + on_append: Callable[[Event, int], None] | None, + ) -> None: + """Set a callback invoked after every successful ``append``. + + The callback receives ``(event, synced_count)`` where + ``synced_count`` is the number of additional events that were + synced from disk inside this append (non-zero only when another + process wrote while this process was alive). + """ + self._on_append = on_append + def get_index(self, event_id: EventID) -> int: """Return the integer index for a given event_id.""" try: @@ -126,10 +141,12 @@ def append(self, event: Event) -> None: evt_id = event.id try: + synced_count = 0 with self._fs.lock(self._lock_path, timeout=LOCK_TIMEOUT_SECONDS): # Sync with disk in case another process wrote while we waited disk_length = self._count_events_on_disk() if disk_length > self._length: + synced_count = disk_length - self._length self._sync_from_disk(disk_length) if evt_id in self._id_to_idx: @@ -149,6 +166,9 @@ def append(self, event: Event) -> None: self._idx_to_id[self._length] = evt_id self._id_to_idx[evt_id] = self._length self._length += 1 + + if self._on_append is not None: + self._on_append(event, synced_count) except TimeoutError: logger.error( f"Failed to acquire EventLog lock within {LOCK_TIMEOUT_SECONDS}s " diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 869959f7f8..1dda57ba04 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -198,10 +198,10 @@ def _default_callback(e): # This callback runs while holding the conversation state's lock # (see BaseConversation.compose_callbacks usage inside `with self._state:` # regions), so updating state here is thread-safe. - # Using `append_event` (instead of `state.events.append`) also - # incrementally updates the cached `state.view`, keeping the view - # in sync without paying the O(n) `View.from_events` cost on each - # step. + # `append_event` persists via EventLog.append, whose `_on_append` + # callback incrementally updates the cached `state.view`, keeping + # the view in sync without paying the O(n) `View.from_events` + # cost on each step. self._state.append_event(e) # Track user MessageEvent IDs here so hook callbacks (which may # synthesize or alter user messages) are captured in one place. @@ -1154,14 +1154,13 @@ def condense(self) -> None: ")" ) - # Add a condensation request event + # Force the agent to take a single step to process the condensation request. + # Both the request event emission and the step must be under the state + # lock so that the view mutation in _on_event cannot race with a + # concurrent run(). condensation_request = CondensationRequest() - self._on_event(condensation_request) - - # Force the agent to take a single step to process the condensation request - # This will trigger the condenser if it handles condensation requests with self._state: - # Take a single step to process the condensation request + self._on_event(condensation_request) self.agent.step(self, on_event=self._on_event, on_token=self._on_token) logger.info("Condensation request processed") diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 7b6d3f00e0..498b4c0a1f 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -207,9 +207,10 @@ class ConversationState(OpenHandsModel): _fs: FileStore = PrivateAttr() # filestore for persistence _events: EventLog = PrivateAttr() # now the storage for events # Cached, incrementally-maintained projection of `_events`. Derived state, - # so it is intentionally not a model field and never persisted. Updated in - # `append_event` on every new event and fully re-derived (with property - # enforcement) via `rebuild_view` on cold load, fork, or error recovery. + # so it is intentionally not a model field and never persisted. Kept in + # sync with the event log via the `_on_append` callback wired by + # `_wire_view_sync`, and fully re-derived (with property enforcement) + # via `rebuild_view` on cold load, fork, or error recovery. # See https://github.com/OpenHands/software-agent-sdk/issues/3053. _view: View = PrivateAttr(default_factory=View) _cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption @@ -236,37 +237,40 @@ def events(self) -> EventLog: def view(self) -> View: """Cached, incrementally-maintained `View` of the conversation events. - The view is updated in lockstep with `events` via `append_event`, so it - always reflects the current `_events` log without paying the O(n) cost - of `View.from_events` on every read. It is fully re-derived (with - property enforcement) only by `rebuild_view`, which is intended for - cold load, fork, and explicit error recovery — not the per-step hot - path. + The view is kept in sync with the event log automatically: every + ``EventLog.append`` triggers an ``_on_append`` callback that + performs either an O(1) incremental update or, when the log + synced extra events from disk, a full ``View.from_events`` + rebuild. The view is also fully re-derived (with property + enforcement) by ``rebuild_view``, intended for cold load, fork, + and explicit error recovery. Callers must treat the returned view as read-only. Mutating it breaks the cache invariant and will cause silent divergence from - `events`. + ``events``. """ return self._view def append_event(self, event: Event) -> None: """Append an event to the conversation, updating the cached view. - This is the only sanctioned write path for adding events to a - running conversation: it persists the event via `EventLog.append` - and incrementally updates the cached `View` so the two stay in - sync. Callers (`_default_callback`, fork, etc.) must hold the - conversation lock; see callers in `LocalConversation` for the - canonical pattern. + This is the preferred write path for adding events to a running + conversation. It persists the event via ``EventLog.append``, + which in turn fires the ``_on_append`` callback to + incrementally update the cached ``View``. The incremental view update is O(1) for the common - `LLMConvertibleEvent` case and O(view-size) only when a - `Condensation` event is applied. It does not run - `enforce_properties`; that fallback path runs only via - `rebuild_view`. + ``LLMConvertibleEvent`` case and O(view-size) only when a + ``Condensation`` event is applied. It does not run + ``enforce_properties``; that fallback path runs only via + ``rebuild_view``. + + The view cache is also maintained when events are appended + directly via ``state.events.append(event)`` (e.g. by external + callers), because the same ``_on_append`` callback is wired + to ``EventLog``. """ self._events.append(event) - self._view.append_event(event) def rebuild_view(self) -> None: """Re-derive the cached view from the full event log. @@ -285,6 +289,26 @@ def rebuild_view(self) -> None: """ self._view = View.from_events(self._events) + def _wire_view_sync(self) -> None: + """Wire the ``EventLog._on_append`` callback to keep ``_view`` current. + + Must be called after ``_events`` is assigned. Subsequent calls to + ``EventLog.append`` — whether via ``self.append_event`` or a + direct ``self.events.append`` — will automatically update the + cached ``_view``. + """ + + def _sync(event: Event, synced_count: int) -> None: + if synced_count > 0: + # Another writer added events between our last append and + # this one. The incremental view has missed those events, + # so a full rebuild is the only safe recovery. + self._view = View.from_events(self._events) + else: + self._view.append_event(event) + + self._events.set_on_append(_sync) + @property def env_observation_persistence_dir(self) -> str | None: """Directory for persisting environment observation files.""" @@ -421,6 +445,7 @@ def create( # older version of the code or otherwise be in a bad state, so # treating this as a cold-load checkpoint is appropriate. state.rebuild_view() + state._wire_view_sync() # Verify compatibility (agent class + tools) agent.verify(state.agent, events=state._events) @@ -454,6 +479,7 @@ def create( ) state._fs = file_store state._events = EventLog(file_store, dir_path=EVENTS_DIR) + state._wire_view_sync() state._cipher = cipher state.stats = ConversationStats() diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py index 61d6b51b56..928d29c170 100644 --- a/tests/sdk/conversation/test_state_view_cache.py +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -44,6 +44,7 @@ def state() -> ConversationState: ) state._fs = InMemoryFileStore() state._events = EventLog(state._fs) + state._wire_view_sync() return state @@ -164,3 +165,50 @@ def test_rebuild_view_replaces_cached_instance(state: ConversationState) -> None assert before is not after assert [e.id for e in after.events] == [e.id for e in before.events] + + +def test_direct_eventlog_append_also_updates_view( + state: ConversationState, +) -> None: + """Appending directly via state.events.append must still update the view. + + This verifies the on_append callback wired by _wire_view_sync, so + callers who bypass state.append_event do not silently diverge the + cached view. + """ + msg = message_event("direct") + state.events.append(msg) + + assert len(state.view) == 1 + assert state.view.events[0] is msg + + +def test_view_rebuilds_when_eventlog_syncs_extra_events( + state: ConversationState, +) -> None: + """If EventLog reports synced_count > 0, the view must be rebuilt. + + We simulate this by writing an event file directly to the in-memory + file store so that EventLog._sync_from_disk picks it up during the + next append. + """ + from openhands.sdk.conversation.persistence_const import EVENT_FILE_PATTERN + + # Manually insert an event file behind EventLog's back, simulating + # another process having written while this state was alive. + sneaky_msg = message_event("sneaky") + payload = sneaky_msg.model_dump_json(exclude_none=True) + path = f"events/{EVENT_FILE_PATTERN.format(idx=0, event_id=sneaky_msg.id)}" + state._fs.write(path, payload) + + # Now append a second event through the normal path. EventLog will + # notice the on-disk file during its lock-and-sync step and report + # synced_count=1, which should trigger a full view rebuild. + second_msg = message_event("second") + state.append_event(second_msg) + + # Both events should be in the view (rebuilt from the full log). + assert len(state.view) == 2 + view_ids = {e.id for e in state.view.events} + assert sneaky_msg.id in view_ids + assert second_msg.id in view_ids From 42f382a9c77bd99d8c6c72e8cd57fdecb438ab7a Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Wed, 20 May 2026 13:41:12 -0600 Subject: [PATCH 4/6] Address review round 2: callback ordering, error emission lock, fork optimization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread 6 – Move _on_append inside EventLog file lock: The callback now fires while the file lock is still held, so concurrent appends execute their callbacks in the same order as their writes. This prevents the view from diverging from the persisted event order. Thread 7 – Move run() error emission under state lock: The except handler in run() now wraps both the execution_status mutation and the ConversationErrorEvent emission in `with self._state:`, so the view update in _on_event cannot race with a concurrent send_message()/run(). Thread 4 – Suppress callback during fork bulk copy: The fork loop now suppresses the _on_append callback during the n-event bulk copy (set_on_append(None)), since rebuild_view() on the next line discards the incremental work. _wire_view_sync() is called after rebuild_view() to restore the callback. Thread 5 – Thread-safety contract documentation: Added docstring notes to set_on_append() and _wire_view_sync() making the locking contract explicit: the callback is serialized by the EventLog file lock, but callers must additionally hold the ConversationState lock to prevent races with non-append readers. Co-authored-by: openhands --- .../openhands/sdk/conversation/event_store.py | 17 +++++++-- .../conversation/impl/local_conversation.py | 35 +++++++++++-------- .../openhands/sdk/conversation/state.py | 9 +++++ 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/event_store.py b/openhands-sdk/openhands/sdk/conversation/event_store.py index 59e7b1b76d..f3c68d273b 100644 --- a/openhands-sdk/openhands/sdk/conversation/event_store.py +++ b/openhands-sdk/openhands/sdk/conversation/event_store.py @@ -68,6 +68,14 @@ def set_on_append( ``synced_count`` is the number of additional events that were synced from disk inside this append (non-zero only when another process wrote while this process was alive). + + The callback is invoked inside the ``EventLog`` file lock, so + concurrent appends always execute their callbacks in the same + order as their writes. However, if the callback mutates shared + in-memory state (e.g. a cached ``View``), callers must still + hold an appropriate application-level lock (such as the + ``ConversationState`` lock) to prevent races with non-append + readers or writers on that shared state. """ self._on_append = on_append @@ -141,10 +149,10 @@ def append(self, event: Event) -> None: evt_id = event.id try: - synced_count = 0 with self._fs.lock(self._lock_path, timeout=LOCK_TIMEOUT_SECONDS): # Sync with disk in case another process wrote while we waited disk_length = self._count_events_on_disk() + synced_count = 0 if disk_length > self._length: synced_count = disk_length - self._length self._sync_from_disk(disk_length) @@ -167,8 +175,11 @@ def append(self, event: Event) -> None: self._id_to_idx[evt_id] = self._length self._length += 1 - if self._on_append is not None: - self._on_append(event, synced_count) + # Fire the callback inside the file lock so that concurrent + # appends execute their callbacks in the same order as their + # writes, preventing view divergence from the persisted log. + if self._on_append is not None: + self._on_append(event, synced_count) except TimeoutError: logger.error( f"Failed to acquire EventLog lock within {LOCK_TIMEOUT_SECONDS}s " diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 1dda57ba04..37d1665a90 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -386,14 +386,15 @@ def fork( tags=tags, ) - # Deep-copy events from source → fork so the source stays - # immutable. Use `append_event` so the fork's cached view is - # built up incrementally alongside the event log, then call - # `rebuild_view` once at the end to run a full enforcement pass - # over the freshly-populated log (same posture as cold load). + # Deep-copy events from source → fork. Suppress the on_append + # callback during the bulk copy: rebuild_view() at the end will + # derive the view from scratch, so paying n incremental view + # updates only to discard them is pure overhead. + fork_conv._state._events.set_on_append(None) for event in self._state.events: - fork_conv._state.append_event(event.model_copy(deep=True)) + fork_conv._state._events.append(event.model_copy(deep=True)) fork_conv._state.rebuild_view() + fork_conv._state._wire_view_sync() # Copy runtime state that accumulated during the source # conversation. activated_knowledge_skills is list[str] – strings @@ -881,16 +882,20 @@ def run(self) -> None: ) break except Exception as e: - self._state.execution_status = ConversationExecutionStatus.ERROR - - # Add an error event - self._on_event( - ConversationErrorEvent( - source="environment", - code=e.__class__.__name__, - detail=str(e), + # Hold the state lock while mutating execution_status and + # emitting the error event so the view update in _on_event + # cannot race with a concurrent send_message()/run(). + with self._state: + self._state.execution_status = ConversationExecutionStatus.ERROR + + # Add an error event + self._on_event( + ConversationErrorEvent( + source="environment", + code=e.__class__.__name__, + detail=str(e), + ) ) - ) # Re-raise with conversation id and persistence dir for better UX raise ConversationRunError( diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 498b4c0a1f..c66883bffb 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -296,6 +296,15 @@ def _wire_view_sync(self) -> None: ``EventLog.append`` — whether via ``self.append_event`` or a direct ``self.events.append`` — will automatically update the cached ``_view``. + + Thread-safety note: the callback executes inside the ``EventLog`` + file lock, which serializes writes. Callers of + ``EventLog.append`` must additionally hold the + ``ConversationState`` lock (``with self._state:``) to prevent + races between the ``_view`` mutation here and non-append readers + or writers. All production call sites in ``LocalConversation`` + satisfy this — see ``_default_callback``, ``fork``, and + ``condense``. """ def _sync(event: Event, synced_count: int) -> None: From 4ade933e8c0ef3262ea4c05937708006c9c05081 Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Wed, 20 May 2026 14:25:02 -0600 Subject: [PATCH 5/6] Address review round 3: incremental sync, orphan guard, close() lock, fork safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread A – Replace O(N) rebuild with incremental sync: The _sync callback now applies only the synced_count missed events incrementally rather than rebuilding the entire view via View.from_events. Lock hold is now O(synced_count) instead of O(N). Thread B – try/finally in fork bulk copy: rebuild_view() + _wire_view_sync() now run in a finally block so the _on_append callback is unconditionally restored even if a deep-copy raises during the bulk event copy. Thread C – CondensationRequest flag test: Added test_condensation_request_sets_flag_incrementally to verify the incremental path sets unhandled_condensation_request without adding the event to the view (not LLMConvertible). Thread D – Orphaned events in fresh-create path: If events/ has files but base_state.json is missing (crash / partial cleanup), the fresh-create path now calls rebuild_view() to populate the cached view from pre-existing events. Corrupted orphans degrade gracefully to an empty view with a warning. Added regression test. Thread E – close() session-end hooks under state lock: run_session_end() in close() now runs inside `with self._state:` so hook-emitted events cannot race with concurrent run()/send_message(). Updated _wire_view_sync docstring to list close() and run() error handler as call sites that satisfy the lock contract. Co-authored-by: openhands --- .../conversation/impl/local_conversation.py | 23 ++++-- .../openhands/sdk/conversation/state.py | 34 +++++++-- .../sdk/conversation/test_state_view_cache.py | 72 ++++++++++++++++--- 3 files changed, 108 insertions(+), 21 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 37d1665a90..0ad6070bb5 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -390,11 +390,15 @@ def fork( # callback during the bulk copy: rebuild_view() at the end will # derive the view from scratch, so paying n incremental view # updates only to discard them is pure overhead. + # try/finally ensures the callback is restored even if an + # event copy raises. fork_conv._state._events.set_on_append(None) - for event in self._state.events: - fork_conv._state._events.append(event.model_copy(deep=True)) - fork_conv._state.rebuild_view() - fork_conv._state._wire_view_sync() + try: + for event in self._state.events: + fork_conv._state._events.append(event.model_copy(deep=True)) + finally: + fork_conv._state.rebuild_view() + fork_conv._state._wire_view_sync() # Copy runtime state that accumulated during the source # conversation. activated_knowledge_skills is list[str] – strings @@ -1000,7 +1004,16 @@ def close(self) -> None: logger.debug("Closing conversation and cleaning up tool executors") hook_processor = getattr(self, "_hook_processor", None) if hook_processor is not None: - hook_processor.run_session_end() + # Session-end hooks emit events through the callback chain, + # which reaches _default_callback → append_event → view + # mutation. Hold the state lock so this cannot race with a + # concurrent run()/send_message(). + state = getattr(self, "_state", None) + if state is not None: + with state: + hook_processor.run_session_end() + else: + hook_processor.run_session_end() try: self._end_observability_span() except AttributeError: diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index c66883bffb..7d67099d2f 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -303,18 +303,22 @@ def _wire_view_sync(self) -> None: ``ConversationState`` lock (``with self._state:``) to prevent races between the ``_view`` mutation here and non-append readers or writers. All production call sites in ``LocalConversation`` - satisfy this — see ``_default_callback``, ``fork``, and - ``condense``. + satisfy this — see ``_default_callback``, ``fork``, + ``condense``, ``run`` (error handler), and ``close`` + (session-end hooks). """ def _sync(event: Event, synced_count: int) -> None: if synced_count > 0: # Another writer added events between our last append and - # this one. The incremental view has missed those events, - # so a full rebuild is the only safe recovery. - self._view = View.from_events(self._events) - else: - self._view.append_event(event) + # this one. Apply only the missed events incrementally + # rather than rebuilding the entire view, so the lock + # hold stays O(synced_count) instead of O(N). + new_len = len(self._events) + start = new_len - synced_count - 1 + for i in range(start, new_len - 1): + self._view.append_event(self._events[i]) + self._view.append_event(event) self._events.set_on_append(_sync) @@ -489,6 +493,22 @@ def create( state._fs = file_store state._events = EventLog(file_store, dir_path=EVENTS_DIR) state._wire_view_sync() + # Guard against orphaned event files from a previous incomplete + # cleanup or crash: base_state.json is missing (fresh path) but + # events/ may already contain files. Without this rebuild the + # cached view would start empty while the log is non-empty, and + # future appends would never pick up the pre-existing events. + # If the orphaned files are corrupted we fall back to an empty + # view — they will still surface errors on explicit iteration. + if len(state._events) > 0: + try: + state.rebuild_view() + except Exception: + logger.warning( + "Failed to rebuild view from orphaned events for " + "conversation %s; starting with empty view", + state.id, + ) state._cipher = cipher state.stats = ConversationStats() diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py index 928d29c170..dcadafcc5b 100644 --- a/tests/sdk/conversation/test_state_view_cache.py +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -17,6 +17,7 @@ from openhands.sdk import LLM, Agent from openhands.sdk.context.view import View from openhands.sdk.conversation.event_store import EventLog +from openhands.sdk.conversation.persistence_const import EVENT_FILE_PATTERN, EVENTS_DIR from openhands.sdk.conversation.state import ConversationState from openhands.sdk.event.condenser import Condensation from openhands.sdk.io import InMemoryFileStore @@ -103,6 +104,18 @@ def test_condensation_event_is_applied_incrementally( assert state.view.events[0] is msgs[1] +def test_condensation_request_sets_flag_incrementally( + state: ConversationState, +) -> None: + """CondensationRequest should set the unhandled flag without adding + an event to the view (it is not LLMConvertible).""" + from openhands.sdk.event.condenser import CondensationRequest + + state.append_event(CondensationRequest()) + assert state.view.unhandled_condensation_request is True + assert len(state.view) == 0 + + def test_append_event_does_not_run_enforce_on_hot_path( state: ConversationState, monkeypatch: pytest.MonkeyPatch ) -> None: @@ -183,17 +196,16 @@ def test_direct_eventlog_append_also_updates_view( assert state.view.events[0] is msg -def test_view_rebuilds_when_eventlog_syncs_extra_events( +def test_view_syncs_when_eventlog_syncs_extra_events( state: ConversationState, ) -> None: - """If EventLog reports synced_count > 0, the view must be rebuilt. + """If EventLog reports synced_count > 0, the missed events must be + incrementally applied to the view before the new event. We simulate this by writing an event file directly to the in-memory file store so that EventLog._sync_from_disk picks it up during the next append. """ - from openhands.sdk.conversation.persistence_const import EVENT_FILE_PATTERN - # Manually insert an event file behind EventLog's back, simulating # another process having written while this state was alive. sneaky_msg = message_event("sneaky") @@ -203,12 +215,54 @@ def test_view_rebuilds_when_eventlog_syncs_extra_events( # Now append a second event through the normal path. EventLog will # notice the on-disk file during its lock-and-sync step and report - # synced_count=1, which should trigger a full view rebuild. + # synced_count=1, which should trigger incremental application of + # the missed event followed by the new one. second_msg = message_event("second") state.append_event(second_msg) - # Both events should be in the view (rebuilt from the full log). + # Both events should be in the view in the correct order. + assert len(state.view) == 2 + view_ids = [e.id for e in state.view.events] + assert view_ids[0] == sneaky_msg.id + assert view_ids[1] == second_msg.id + + +def test_fresh_create_rebuilds_view_for_orphaned_events() -> None: + """ConversationState.create on a directory that has event files but no + base_state.json (crash / partial cleanup) must still populate the + cached view from the pre-existing events.""" + llm = LLM(model="gpt-4o-mini", api_key=SecretStr("test-key"), usage_id="test-llm") + agent = Agent(llm=llm) + workspace = LocalWorkspace(working_dir="/tmp/test") + + # Set up an in-memory file store with orphaned event files but NO + # base_state.json, simulating a crash before state was saved. + fs = InMemoryFileStore() + orphan_msg = message_event("orphan") + payload = orphan_msg.model_dump_json(exclude_none=True) + path = f"{EVENTS_DIR}/{EVENT_FILE_PATTERN.format(idx=0, event_id=orphan_msg.id)}" + fs.write(path, payload) + + state = ConversationState( + id=uuid.uuid4(), + workspace=workspace, + persistence_dir=None, + agent=agent, + ) + state._fs = fs + state._events = EventLog(fs, dir_path=EVENTS_DIR) + state._wire_view_sync() + + # Mimic the fresh-create guard: rebuild if events already exist. + if len(state._events) > 0: + state.rebuild_view() + + # The orphaned event should be visible in the cached view. + assert len(state.view) == 1 + assert state.view.events[0].id == orphan_msg.id + + # Future appends should still work correctly. + new_msg = message_event("new") + state.append_event(new_msg) assert len(state.view) == 2 - view_ids = {e.id for e in state.view.events} - assert sneaky_msg.id in view_ids - assert second_msg.id in view_ids + assert state.view.events[1].id == new_msg.id From 1e1d4f4837b76bac9a9a1a77997c49b13ff0fdd0 Mon Sep 17 00:00:00 2001 From: Calvin Smith Date: Wed, 20 May 2026 14:49:44 -0600 Subject: [PATCH 6/6] Address review round 4: nested try/finally, orphan test via create(), astep() parity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread A – Nested try/finally in fork bulk copy: If rebuild_view() raises, _wire_view_sync() was silently skipped because a single finally block aborts on its first exception. Nested try/finally ensures the callback is unconditionally restored. Thread B – Orphan test exercises ConversationState.create(): Replaced the manual mimicry of the orphan guard with a real ConversationState.create(persistence_dir=tmpdir) call against a temp directory containing orphaned event files on disk. Any regression in the production code path is now caught. Thread C – rebuild_view() parity in astep(): The malformed-history handler in astep() now calls state.rebuild_view() before emitting CondensationRequest, mirroring the sync step() path. Added async regression test with acompletion/aresponses overrides on the test LLM. Co-authored-by: openhands --- openhands-sdk/openhands/sdk/agent/agent.py | 4 ++ .../conversation/impl/local_conversation.py | 10 ++-- .../test_agent_context_window_condensation.py | 46 +++++++++++++++++++ .../sdk/conversation/test_state_view_cache.py | 38 +++++++-------- 4 files changed, 76 insertions(+), 22 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/agent.py b/openhands-sdk/openhands/sdk/agent/agent.py index 17eeaf0191..5a923e9224 100644 --- a/openhands-sdk/openhands/sdk/agent/agent.py +++ b/openhands-sdk/openhands/sdk/agent/agent.py @@ -776,6 +776,10 @@ async def astep( "history: %s", e, ) + # Re-derive the cached view with full enforce_properties + # before the condensation retry — mirrors step(). See + # https://github.com/OpenHands/software-agent-sdk/issues/3053. + state.rebuild_view() on_event(CondensationRequest()) return logger.warning( diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 3fca093703..3fd0de9675 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -411,15 +411,17 @@ def fork( # callback during the bulk copy: rebuild_view() at the end will # derive the view from scratch, so paying n incremental view # updates only to discard them is pure overhead. - # try/finally ensures the callback is restored even if an - # event copy raises. + # Nested try/finally ensures _wire_view_sync() restores the + # callback even if rebuild_view() itself raises. fork_conv._state._events.set_on_append(None) try: for event in self._state.events: fork_conv._state._events.append(event.model_copy(deep=True)) finally: - fork_conv._state.rebuild_view() - fork_conv._state._wire_view_sync() + try: + fork_conv._state.rebuild_view() + finally: + fork_conv._state._wire_view_sync() # Copy runtime state that accumulated during the source # conversation. activated_knowledge_skills is list[str] – strings diff --git a/tests/sdk/agent/test_agent_context_window_condensation.py b/tests/sdk/agent/test_agent_context_window_condensation.py index 6bff123096..4fa49c1341 100644 --- a/tests/sdk/agent/test_agent_context_window_condensation.py +++ b/tests/sdk/agent/test_agent_context_window_condensation.py @@ -58,6 +58,18 @@ def responses(self, *, messages, tools=None, **kwargs): # type: ignore[override "immediately after" ) + async def acompletion(self, *, messages, tools=None, **kwargs): # type: ignore[override] + raise LLMMalformedConversationHistoryError( + "messages.134: `tool_use` ids were found without `tool_result` blocks " + "immediately after" + ) + + async def aresponses(self, *, messages, tools=None, **kwargs): # type: ignore[override] + raise LLMMalformedConversationHistoryError( + "messages.134: `tool_use` ids were found without `tool_result` blocks " + "immediately after" + ) + class HandlesRequestsCondenser(CondenserBase): def condense( @@ -151,6 +163,40 @@ def counting_rebuild(self): ) +async def test_agent_rebuilds_view_on_malformed_history_recovery_async( + monkeypatch, +): + """Async counterpart of the sync rebuild_view test. + + ``astep()`` must also call ``state.rebuild_view()`` before emitting + ``CondensationRequest`` on malformed-history recovery. See #3053. + """ + llm = MalformedHistoryRaisingLLM() + agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) + convo = Conversation(agent=agent) + + convo._ensure_agent_ready() + + call_count = 0 + original_rebuild = type(convo.state).rebuild_view + + def counting_rebuild(self): + nonlocal call_count + call_count += 1 + return original_rebuild(self) + + monkeypatch.setattr(type(convo.state), "rebuild_view", counting_rebuild) + + seen: list = [] + await agent.astep(convo, on_event=seen.append) + + assert any(isinstance(e, CondensationRequest) for e in seen) + assert call_count == 1, ( + f"rebuild_view should be called exactly once on malformed-history " + f"recovery (async), got {call_count}" + ) + + @pytest.mark.parametrize("force_responses", [True, False]) def test_agent_raises_ctx_exceeded_when_no_condenser(force_responses: bool): llm = RaisingLLM(force_responses=force_responses) diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py index dcadafcc5b..4d48d1f534 100644 --- a/tests/sdk/conversation/test_state_view_cache.py +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -10,6 +10,7 @@ from __future__ import annotations import uuid +from pathlib import Path import pytest from pydantic import SecretStr @@ -227,35 +228,36 @@ def test_view_syncs_when_eventlog_syncs_extra_events( assert view_ids[1] == second_msg.id -def test_fresh_create_rebuilds_view_for_orphaned_events() -> None: +def test_fresh_create_rebuilds_view_for_orphaned_events( + tmp_path: Path, +) -> None: """ConversationState.create on a directory that has event files but no base_state.json (crash / partial cleanup) must still populate the - cached view from the pre-existing events.""" + cached view from the pre-existing events. + + Uses the real ``ConversationState.create()`` factory so that any + regression in the production orphan-handling guard is caught. + """ llm = LLM(model="gpt-4o-mini", api_key=SecretStr("test-key"), usage_id="test-llm") agent = Agent(llm=llm) workspace = LocalWorkspace(working_dir="/tmp/test") - # Set up an in-memory file store with orphaned event files but NO - # base_state.json, simulating a crash before state was saved. - fs = InMemoryFileStore() + # Write an orphaned event file on disk with no base_state.json, + # simulating a crash before state was fully saved. + events_dir = tmp_path / EVENTS_DIR + events_dir.mkdir(parents=True) orphan_msg = message_event("orphan") - payload = orphan_msg.model_dump_json(exclude_none=True) - path = f"{EVENTS_DIR}/{EVENT_FILE_PATTERN.format(idx=0, event_id=orphan_msg.id)}" - fs.write(path, payload) + event_filename = EVENT_FILE_PATTERN.format(idx=0, event_id=orphan_msg.id) + (events_dir / event_filename).write_text( + orphan_msg.model_dump_json(exclude_none=True) + ) - state = ConversationState( + state = ConversationState.create( id=uuid.uuid4(), - workspace=workspace, - persistence_dir=None, agent=agent, + workspace=workspace, + persistence_dir=str(tmp_path), ) - state._fs = fs - state._events = EventLog(fs, dir_path=EVENTS_DIR) - state._wire_view_sync() - - # Mimic the fresh-create guard: rebuild if events already exist. - if len(state._events) > 0: - state.rebuild_view() # The orphaned event should be visible in the cached view. assert len(state.view) == 1