diff --git a/openhands-sdk/openhands/sdk/agent/agent.py b/openhands-sdk/openhands/sdk/agent/agent.py index efafbe69c9..5a923e9224 100644 --- a/openhands-sdk/openhands/sdk/agent/agent.py +++ b/openhands-sdk/openhands/sdk/agent/agent.py @@ -634,6 +634,13 @@ def step( "triggering condensation retry with condensed history: " f"{e}" ) + # The incremental view we just sent may itself be the source of + # the malformed history (e.g., a manipulation-indices bug that + # let a property slip). Re-derive it from the event log with + # full `enforce_properties` before kicking off the condensation + # retry, so the condenser operates on a clean, enforced view. + # See https://github.com/OpenHands/software-agent-sdk/issues/3053. + state.rebuild_view() on_event(CondensationRequest()) return logger.warning( @@ -769,6 +776,10 @@ async def astep( "history: %s", e, ) + # Re-derive the cached view with full enforce_properties + # before the condensation retry — mirrors step(). See + # https://github.com/OpenHands/software-agent-sdk/issues/3053. + state.rebuild_view() on_event(CondensationRequest()) return logger.warning( diff --git a/openhands-sdk/openhands/sdk/context/condenser/base.py b/openhands-sdk/openhands/sdk/context/condenser/base.py index 22333bb36b..310f648eb7 100644 --- a/openhands-sdk/openhands/sdk/context/condenser/base.py +++ b/openhands-sdk/openhands/sdk/context/condenser/base.py @@ -39,6 +39,12 @@ def condense(self, view: View, agent_llm: LLM | None = None) -> View | Condensat Args: view: A view of the history containing all events that should be condensed. + **Implementations must treat this view as read-only.** The view may be + a cached projection owned by `ConversationState` + (see https://github.com/OpenHands/software-agent-sdk/issues/3053), and + mutating it in place will corrupt that cache and silently desynchronize + the conversation from its event log. Return a new `View` (e.g. + `View(events=view.events[k:])`) or a `Condensation` event instead. agent_llm: LLM instance used by the agent. Condensers use this for token counting purposes. Defaults to None. diff --git a/openhands-sdk/openhands/sdk/conversation/event_store.py b/openhands-sdk/openhands/sdk/conversation/event_store.py index 9c3fa4e097..f3c68d273b 100644 --- a/openhands-sdk/openhands/sdk/conversation/event_store.py +++ b/openhands-sdk/openhands/sdk/conversation/event_store.py @@ -40,6 +40,7 @@ class EventLog(EventsListBase): _length: int _lock_path: str _write_guard: Callable[[], AbstractContextManager[None]] | None + _on_append: Callable[[Event, int], None] | None def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None: self._fs = fs @@ -48,6 +49,7 @@ def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None: self._idx_to_id: dict[int, EventID] = {} self._lock_path = f"{dir_path}/{LOCK_FILE_NAME}" self._write_guard = None + self._on_append = None self._length = self._scan_and_build_index() def set_write_guard( @@ -56,6 +58,27 @@ def set_write_guard( ) -> None: self._write_guard = write_guard + def set_on_append( + self, + on_append: Callable[[Event, int], None] | None, + ) -> None: + """Set a callback invoked after every successful ``append``. + + The callback receives ``(event, synced_count)`` where + ``synced_count`` is the number of additional events that were + synced from disk inside this append (non-zero only when another + process wrote while this process was alive). + + The callback is invoked inside the ``EventLog`` file lock, so + concurrent appends always execute their callbacks in the same + order as their writes. However, if the callback mutates shared + in-memory state (e.g. a cached ``View``), callers must still + hold an appropriate application-level lock (such as the + ``ConversationState`` lock) to prevent races with non-append + readers or writers on that shared state. + """ + self._on_append = on_append + def get_index(self, event_id: EventID) -> int: """Return the integer index for a given event_id.""" try: @@ -129,7 +152,9 @@ def append(self, event: Event) -> None: with self._fs.lock(self._lock_path, timeout=LOCK_TIMEOUT_SECONDS): # Sync with disk in case another process wrote while we waited disk_length = self._count_events_on_disk() + synced_count = 0 if disk_length > self._length: + synced_count = disk_length - self._length self._sync_from_disk(disk_length) if evt_id in self._id_to_idx: @@ -149,6 +174,12 @@ def append(self, event: Event) -> None: self._idx_to_id[self._length] = evt_id self._id_to_idx[evt_id] = self._length self._length += 1 + + # Fire the callback inside the file lock so that concurrent + # appends execute their callbacks in the same order as their + # writes, preventing view divergence from the persisted log. + if self._on_append is not None: + self._on_append(event, synced_count) except TimeoutError: logger.error( f"Failed to acquire EventLog lock within {LOCK_TIMEOUT_SECONDS}s " diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index fd65dca465..3fd0de9675 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -206,7 +206,11 @@ def _default_callback(e): # This callback runs while holding the conversation state's lock # (see BaseConversation.compose_callbacks usage inside `with self._state:` # regions), so updating state here is thread-safe. - self._state.events.append(e) + # `append_event` persists via EventLog.append, whose `_on_append` + # callback incrementally updates the cached `state.view`, keeping + # the view in sync without paying the O(n) `View.from_events` + # cost on each step. + self._state.append_event(e) # Track user MessageEvent IDs here so hook callbacks (which may # synthesize or alter user messages) are captured in one place. if isinstance(e, MessageEvent) and e.source == "user": @@ -403,10 +407,21 @@ def fork( tags=tags, ) - # Deep-copy events from source → fork so the source stays - # immutable. - for event in self._state.events: - fork_conv._state.events.append(event.model_copy(deep=True)) + # Deep-copy events from source → fork. Suppress the on_append + # callback during the bulk copy: rebuild_view() at the end will + # derive the view from scratch, so paying n incremental view + # updates only to discard them is pure overhead. + # Nested try/finally ensures _wire_view_sync() restores the + # callback even if rebuild_view() itself raises. + fork_conv._state._events.set_on_append(None) + try: + for event in self._state.events: + fork_conv._state._events.append(event.model_copy(deep=True)) + finally: + try: + fork_conv._state.rebuild_view() + finally: + fork_conv._state._wire_view_sync() # Copy runtime state that accumulated during the source # conversation. activated_knowledge_skills is list[str] – strings @@ -895,6 +910,9 @@ def run(self) -> None: ) break except Exception as e: + # Hold the state lock while mutating execution_status and + # emitting the error event so the view update in _on_event + # cannot race with a concurrent send_message()/run(). with self._state: self._state.execution_status = ConversationExecutionStatus.ERROR @@ -1229,7 +1247,16 @@ def close(self) -> None: logger.debug("Closing conversation and cleaning up tool executors") hook_processor = getattr(self, "_hook_processor", None) if hook_processor is not None: - hook_processor.run_session_end() + # Session-end hooks emit events through the callback chain, + # which reaches _default_callback → append_event → view + # mutation. Hold the state lock so this cannot race with a + # concurrent run()/send_message(). + state = getattr(self, "_state", None) + if state is not None: + with state: + hook_processor.run_session_end() + else: + hook_processor.run_session_end() try: self._end_observability_span() except AttributeError: @@ -1388,14 +1415,13 @@ def condense(self) -> None: ")" ) - # Add a condensation request event + # Force the agent to take a single step to process the condensation request. + # Both the request event emission and the step must be under the state + # lock so that the view mutation in _on_event cannot race with a + # concurrent run(). condensation_request = CondensationRequest() - self._on_event(condensation_request) - - # Force the agent to take a single step to process the condensation request - # This will trigger the condenser if it handles condensation requests with self._state: - # Take a single step to process the condensation request + self._on_event(condensation_request) self.agent.step(self, on_event=self._on_event, on_token=self._on_token) logger.info("Condensation request processed") diff --git a/openhands-sdk/openhands/sdk/conversation/state.py b/openhands-sdk/openhands/sdk/conversation/state.py index 7e137929f2..7d67099d2f 100644 --- a/openhands-sdk/openhands/sdk/conversation/state.py +++ b/openhands-sdk/openhands/sdk/conversation/state.py @@ -9,6 +9,7 @@ from pydantic import Field, PrivateAttr from openhands.sdk.agent.base import AgentBase +from openhands.sdk.context.view import View from openhands.sdk.conversation.conversation_stats import ConversationStats from openhands.sdk.conversation.event_store import EventLog from openhands.sdk.conversation.fifo_lock import FIFOLock @@ -205,6 +206,13 @@ class ConversationState(OpenHandsModel): # ===== Private attrs (NOT Fields) ===== _fs: FileStore = PrivateAttr() # filestore for persistence _events: EventLog = PrivateAttr() # now the storage for events + # Cached, incrementally-maintained projection of `_events`. Derived state, + # so it is intentionally not a model field and never persisted. Kept in + # sync with the event log via the `_on_append` callback wired by + # `_wire_view_sync`, and fully re-derived (with property enforcement) + # via `rebuild_view` on cold load, fork, or error recovery. + # See https://github.com/OpenHands/software-agent-sdk/issues/3053. + _view: View = PrivateAttr(default_factory=View) _cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption _autosave_enabled: bool = PrivateAttr( default=False @@ -225,6 +233,95 @@ class ConversationState(OpenHandsModel): def events(self) -> EventLog: return self._events + @property + def view(self) -> View: + """Cached, incrementally-maintained `View` of the conversation events. + + The view is kept in sync with the event log automatically: every + ``EventLog.append`` triggers an ``_on_append`` callback that + performs either an O(1) incremental update or, when the log + synced extra events from disk, a full ``View.from_events`` + rebuild. The view is also fully re-derived (with property + enforcement) by ``rebuild_view``, intended for cold load, fork, + and explicit error recovery. + + Callers must treat the returned view as read-only. Mutating it + breaks the cache invariant and will cause silent divergence from + ``events``. + """ + return self._view + + def append_event(self, event: Event) -> None: + """Append an event to the conversation, updating the cached view. + + This is the preferred write path for adding events to a running + conversation. It persists the event via ``EventLog.append``, + which in turn fires the ``_on_append`` callback to + incrementally update the cached ``View``. + + The incremental view update is O(1) for the common + ``LLMConvertibleEvent`` case and O(view-size) only when a + ``Condensation`` event is applied. It does not run + ``enforce_properties``; that fallback path runs only via + ``rebuild_view``. + + The view cache is also maintained when events are appended + directly via ``state.events.append(event)`` (e.g. by external + callers), because the same ``_on_append`` callback is wired + to ``EventLog``. + """ + self._events.append(event) + + def rebuild_view(self) -> None: + """Re-derive the cached view from the full event log. + + Runs `View.from_events` over the persisted log, which applies all + view property enforcement. This is the fallback path described in + `ViewPropertyBase` and should be used only on: + + - Cold load (resuming a persisted `ConversationState`), since the + on-disk events may have been written by an older version or + could be corrupted. + - Fork creation, after deep-copying events from the source state. + - Explicit error recovery (e.g., when the LLM rejects the current + message history as malformed and we want to re-validate the + view before retrying with condensation). + """ + self._view = View.from_events(self._events) + + def _wire_view_sync(self) -> None: + """Wire the ``EventLog._on_append`` callback to keep ``_view`` current. + + Must be called after ``_events`` is assigned. Subsequent calls to + ``EventLog.append`` — whether via ``self.append_event`` or a + direct ``self.events.append`` — will automatically update the + cached ``_view``. + + Thread-safety note: the callback executes inside the ``EventLog`` + file lock, which serializes writes. Callers of + ``EventLog.append`` must additionally hold the + ``ConversationState`` lock (``with self._state:``) to prevent + races between the ``_view`` mutation here and non-append readers + or writers. All production call sites in ``LocalConversation`` + satisfy this — see ``_default_callback``, ``fork``, + ``condense``, ``run`` (error handler), and ``close`` + (session-end hooks). + """ + + def _sync(event: Event, synced_count: int) -> None: + if synced_count > 0: + # Another writer added events between our last append and + # this one. Apply only the missed events incrementally + # rather than rebuilding the entire view, so the lock + # hold stays O(synced_count) instead of O(N). + new_len = len(self._events) + start = new_len - synced_count - 1 + for i in range(start, new_len - 1): + self._view.append_event(self._events[i]) + self._view.append_event(event) + + self._events.set_on_append(_sync) + @property def env_observation_persistence_dir(self) -> str | None: """Directory for persisting environment observation files.""" @@ -355,6 +452,14 @@ def create( state._events = EventLog(file_store, dir_path=EVENTS_DIR) state._cipher = cipher + # Build the cached view from the persisted event log. This is the + # one place where we pay the full `View.from_events` cost (which + # runs property enforcement) — events may have been written by an + # older version of the code or otherwise be in a bad state, so + # treating this as a cold-load checkpoint is appropriate. + state.rebuild_view() + state._wire_view_sync() + # Verify compatibility (agent class + tools) agent.verify(state.agent, events=state._events) @@ -387,6 +492,23 @@ def create( ) state._fs = file_store state._events = EventLog(file_store, dir_path=EVENTS_DIR) + state._wire_view_sync() + # Guard against orphaned event files from a previous incomplete + # cleanup or crash: base_state.json is missing (fresh path) but + # events/ may already contain files. Without this rebuild the + # cached view would start empty while the log is non-empty, and + # future appends would never pick up the pre-existing events. + # If the orphaned files are corrupted we fall back to an empty + # view — they will still surface errors on explicit iteration. + if len(state._events) > 0: + try: + state.rebuild_view() + except Exception: + logger.warning( + "Failed to rebuild view from orphaned events for " + "conversation %s; starting with empty view", + state.id, + ) state._cipher = cipher state.stats = ConversationStats() diff --git a/tests/sdk/agent/test_agent_context_window_condensation.py b/tests/sdk/agent/test_agent_context_window_condensation.py index 28beda25d1..4fa49c1341 100644 --- a/tests/sdk/agent/test_agent_context_window_condensation.py +++ b/tests/sdk/agent/test_agent_context_window_condensation.py @@ -58,6 +58,18 @@ def responses(self, *, messages, tools=None, **kwargs): # type: ignore[override "immediately after" ) + async def acompletion(self, *, messages, tools=None, **kwargs): # type: ignore[override] + raise LLMMalformedConversationHistoryError( + "messages.134: `tool_use` ids were found without `tool_result` blocks " + "immediately after" + ) + + async def aresponses(self, *, messages, tools=None, **kwargs): # type: ignore[override] + raise LLMMalformedConversationHistoryError( + "messages.134: `tool_use` ids were found without `tool_result` blocks " + "immediately after" + ) + class HandlesRequestsCondenser(CondenserBase): def condense( @@ -118,6 +130,73 @@ def on_event(e): ) +def test_agent_rebuilds_view_on_malformed_history_recovery(monkeypatch): + """The malformed-history recovery path must re-derive the cached view. + + If the incremental view itself is what produced the malformed history + (e.g., a manipulation-indices bug let a property slip), the condensation + retry needs a freshly enforced view to work from. See #3053. + """ + llm = MalformedHistoryRaisingLLM() + agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) + convo = Conversation(agent=agent) + + convo._ensure_agent_ready() + + call_count = 0 + original_rebuild = type(convo.state).rebuild_view + + def counting_rebuild(self): + nonlocal call_count + call_count += 1 + return original_rebuild(self) + + monkeypatch.setattr(type(convo.state), "rebuild_view", counting_rebuild) + + seen = [] + agent.step(convo, on_event=seen.append) + + assert any(isinstance(e, CondensationRequest) for e in seen) + assert call_count == 1, ( + f"rebuild_view should be called exactly once on malformed-history " + f"recovery, got {call_count}" + ) + + +async def test_agent_rebuilds_view_on_malformed_history_recovery_async( + monkeypatch, +): + """Async counterpart of the sync rebuild_view test. + + ``astep()`` must also call ``state.rebuild_view()`` before emitting + ``CondensationRequest`` on malformed-history recovery. See #3053. + """ + llm = MalformedHistoryRaisingLLM() + agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser()) + convo = Conversation(agent=agent) + + convo._ensure_agent_ready() + + call_count = 0 + original_rebuild = type(convo.state).rebuild_view + + def counting_rebuild(self): + nonlocal call_count + call_count += 1 + return original_rebuild(self) + + monkeypatch.setattr(type(convo.state), "rebuild_view", counting_rebuild) + + seen: list = [] + await agent.astep(convo, on_event=seen.append) + + assert any(isinstance(e, CondensationRequest) for e in seen) + assert call_count == 1, ( + f"rebuild_view should be called exactly once on malformed-history " + f"recovery (async), got {call_count}" + ) + + @pytest.mark.parametrize("force_responses", [True, False]) def test_agent_raises_ctx_exceeded_when_no_condenser(force_responses: bool): llm = RaisingLLM(force_responses=force_responses) diff --git a/tests/sdk/conversation/test_state_view_cache.py b/tests/sdk/conversation/test_state_view_cache.py new file mode 100644 index 0000000000..4d48d1f534 --- /dev/null +++ b/tests/sdk/conversation/test_state_view_cache.py @@ -0,0 +1,270 @@ +"""Tests for the cached `ConversationState.view` and the `append_event` / +`rebuild_view` write paths added for issue #3053. + +These tests assert that the incremental view stays in sync with the event log +without paying the cost of `View.from_events` on every append, and that the +full `enforce_properties` pass is reserved for explicit `rebuild_view` calls +(cold load, fork, error recovery). +""" + +from __future__ import annotations + +import uuid +from pathlib import Path + +import pytest +from pydantic import SecretStr + +from openhands.sdk import LLM, Agent +from openhands.sdk.context.view import View +from openhands.sdk.conversation.event_store import EventLog +from openhands.sdk.conversation.persistence_const import EVENT_FILE_PATTERN, EVENTS_DIR +from openhands.sdk.conversation.state import ConversationState +from openhands.sdk.event.condenser import Condensation +from openhands.sdk.io import InMemoryFileStore +from openhands.sdk.workspace import LocalWorkspace +from tests.sdk.context.view.conftest import message_event + + +@pytest.fixture +def state() -> ConversationState: + """A bare ConversationState with an in-memory event log attached. + + We do not use `ConversationState.create` here because that path also + touches a LocalFileStore on disk; for these unit tests an in-memory + store is sufficient. + """ + llm = LLM(model="gpt-4o-mini", api_key=SecretStr("test-key"), usage_id="test-llm") + agent = Agent(llm=llm) + workspace = LocalWorkspace(working_dir="/tmp/test") + + state = ConversationState( + id=uuid.uuid4(), + workspace=workspace, + persistence_dir=None, + agent=agent, + ) + state._fs = InMemoryFileStore() + state._events = EventLog(state._fs) + state._wire_view_sync() + return state + + +def test_fresh_state_has_empty_view(state: ConversationState) -> None: + assert len(state.view) == 0 + assert state.view.events == [] + + +def test_append_event_updates_both_event_log_and_view( + state: ConversationState, +) -> None: + msg = message_event("hello") + state.append_event(msg) + + # EventLog rehydrates from disk on read, so we compare by id rather than + # identity for the underlying log; the view holds direct references and + # can be compared with `is`. + assert len(state.events) == 1 + assert state.events[0].id == msg.id + assert len(state.view) == 1 + assert state.view.events[0] is msg + + +def test_view_stays_in_parity_with_from_events_after_many_appends( + state: ConversationState, +) -> None: + msgs = [message_event(f"msg {i}") for i in range(5)] + for msg in msgs: + state.append_event(msg) + + rebuilt = View.from_events(state.events) + assert [e.id for e in state.view.events] == [e.id for e in rebuilt.events] + assert ( + state.view.unhandled_condensation_request + == rebuilt.unhandled_condensation_request + ) + + +def test_condensation_event_is_applied_incrementally( + state: ConversationState, +) -> None: + msgs = [message_event(f"msg {i}") for i in range(3)] + for msg in msgs: + state.append_event(msg) + + condensation = Condensation( + forgotten_event_ids={msgs[0].id, msgs[2].id}, + llm_response_id="resp_1", + ) + state.append_event(condensation) + + # The Condensation is still in the underlying log (it is not LLM-convertible + # but is part of the persisted history); the view, however, should reflect + # the condensation by dropping the forgotten messages. + assert len(state.view) == 1 + assert state.view.events[0] is msgs[1] + + +def test_condensation_request_sets_flag_incrementally( + state: ConversationState, +) -> None: + """CondensationRequest should set the unhandled flag without adding + an event to the view (it is not LLMConvertible).""" + from openhands.sdk.event.condenser import CondensationRequest + + state.append_event(CondensationRequest()) + assert state.view.unhandled_condensation_request is True + assert len(state.view) == 0 + + +def test_append_event_does_not_run_enforce_on_hot_path( + state: ConversationState, monkeypatch: pytest.MonkeyPatch +) -> None: + """The hot path must not pay the cost of `enforce_properties`. + + This is the core perf invariant from #3053: incremental updates rely on + manipulation indices keeping the view well-formed, so enforcement should + only run on `rebuild_view`. + """ + call_count = 0 + original_enforce = View.enforce_properties + + def counting_enforce(self: View, all_events): # type: ignore[no-untyped-def] + nonlocal call_count + call_count += 1 + return original_enforce(self, all_events) + + monkeypatch.setattr(View, "enforce_properties", counting_enforce) + + for i in range(10): + state.append_event(message_event(f"msg {i}")) + + assert call_count == 0, ( + "append_event must not invoke enforce_properties on the hot path" + ) + + +def test_rebuild_view_runs_enforce( + state: ConversationState, monkeypatch: pytest.MonkeyPatch +) -> None: + """rebuild_view is the one place where full property enforcement runs.""" + call_count = 0 + original_enforce = View.enforce_properties + + def counting_enforce(self: View, all_events): # type: ignore[no-untyped-def] + nonlocal call_count + call_count += 1 + return original_enforce(self, all_events) + + monkeypatch.setattr(View, "enforce_properties", counting_enforce) + + for i in range(3): + state.append_event(message_event(f"msg {i}")) + + # Sanity: no enforce so far. + assert call_count == 0 + + state.rebuild_view() + assert call_count >= 1 + # Parity check after the rebuild. + assert [e.id for e in state.view.events] == [e.id for e in state.events] + + +def test_rebuild_view_replaces_cached_instance(state: ConversationState) -> None: + """rebuild_view should produce a fresh View instance derived from the log.""" + state.append_event(message_event("hello")) + before = state.view + state.rebuild_view() + after = state.view + + assert before is not after + assert [e.id for e in after.events] == [e.id for e in before.events] + + +def test_direct_eventlog_append_also_updates_view( + state: ConversationState, +) -> None: + """Appending directly via state.events.append must still update the view. + + This verifies the on_append callback wired by _wire_view_sync, so + callers who bypass state.append_event do not silently diverge the + cached view. + """ + msg = message_event("direct") + state.events.append(msg) + + assert len(state.view) == 1 + assert state.view.events[0] is msg + + +def test_view_syncs_when_eventlog_syncs_extra_events( + state: ConversationState, +) -> None: + """If EventLog reports synced_count > 0, the missed events must be + incrementally applied to the view before the new event. + + We simulate this by writing an event file directly to the in-memory + file store so that EventLog._sync_from_disk picks it up during the + next append. + """ + # Manually insert an event file behind EventLog's back, simulating + # another process having written while this state was alive. + sneaky_msg = message_event("sneaky") + payload = sneaky_msg.model_dump_json(exclude_none=True) + path = f"events/{EVENT_FILE_PATTERN.format(idx=0, event_id=sneaky_msg.id)}" + state._fs.write(path, payload) + + # Now append a second event through the normal path. EventLog will + # notice the on-disk file during its lock-and-sync step and report + # synced_count=1, which should trigger incremental application of + # the missed event followed by the new one. + second_msg = message_event("second") + state.append_event(second_msg) + + # Both events should be in the view in the correct order. + assert len(state.view) == 2 + view_ids = [e.id for e in state.view.events] + assert view_ids[0] == sneaky_msg.id + assert view_ids[1] == second_msg.id + + +def test_fresh_create_rebuilds_view_for_orphaned_events( + tmp_path: Path, +) -> None: + """ConversationState.create on a directory that has event files but no + base_state.json (crash / partial cleanup) must still populate the + cached view from the pre-existing events. + + Uses the real ``ConversationState.create()`` factory so that any + regression in the production orphan-handling guard is caught. + """ + llm = LLM(model="gpt-4o-mini", api_key=SecretStr("test-key"), usage_id="test-llm") + agent = Agent(llm=llm) + workspace = LocalWorkspace(working_dir="/tmp/test") + + # Write an orphaned event file on disk with no base_state.json, + # simulating a crash before state was fully saved. + events_dir = tmp_path / EVENTS_DIR + events_dir.mkdir(parents=True) + orphan_msg = message_event("orphan") + event_filename = EVENT_FILE_PATTERN.format(idx=0, event_id=orphan_msg.id) + (events_dir / event_filename).write_text( + orphan_msg.model_dump_json(exclude_none=True) + ) + + state = ConversationState.create( + id=uuid.uuid4(), + agent=agent, + workspace=workspace, + persistence_dir=str(tmp_path), + ) + + # The orphaned event should be visible in the cached view. + assert len(state.view) == 1 + assert state.view.events[0].id == orphan_msg.id + + # Future appends should still work correctly. + new_msg = message_event("new") + state.append_event(new_msg) + assert len(state.view) == 2 + assert state.view.events[1].id == new_msg.id