Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions openhands-sdk/openhands/sdk/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,10 @@ def step(
"triggering condensation retry with condensed history: "
f"{e}"
)
# The incremental view may itself be the source of the
# malformed history. Re-derive with full enforcement so
# the condenser operates on a clean view.
state.rebuild_view()
Comment thread
csmith49 marked this conversation as resolved.
on_event(CondensationRequest())
return
logger.warning(
Expand Down Expand Up @@ -769,6 +773,9 @@ async def astep(
"history: %s",
e,
)
# Mirror step(): re-derive the cached view with full
# enforcement before the condensation retry.
state.rebuild_view()
on_event(CondensationRequest())
return
logger.warning(
Expand Down
5 changes: 5 additions & 0 deletions openhands-sdk/openhands/sdk/context/condenser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ def condense(self, view: View, agent_llm: LLM | None = None) -> View | Condensat

Args:
view: A view of the history containing all events that should be condensed.
**Implementations must treat this view as read-only.** The view may be
a cached projection owned by ``ConversationState``
(see https://github.com/OpenHands/software-agent-sdk/issues/3053), and
mutating it in place will corrupt that cache. Return a new ``View``
(e.g. ``View(events=view.events[k:])``) or a ``Condensation`` instead.
agent_llm: LLM instance used by the agent. Condensers use this for token
counting purposes. Defaults to None.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ def fork(
# immutable.
for event in self._state.events:
fork_conv._state.events.append(event.model_copy(deep=True))
# Full rebuild: the copied events may need property enforcement
# (same posture as cold load).
fork_conv._state.rebuild_view()

# Copy runtime state that accumulated during the source
# conversation. activated_knowledge_skills is list[str] – strings
Expand Down
73 changes: 73 additions & 0 deletions openhands-sdk/openhands/sdk/conversation/state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# state.py
import json
import threading
from collections.abc import Callable, Sequence
from contextlib import AbstractContextManager
from enum import Enum
Expand All @@ -9,6 +10,7 @@
from pydantic import Field, PrivateAttr

from openhands.sdk.agent.base import AgentBase
from openhands.sdk.context.view import View
from openhands.sdk.conversation.conversation_stats import ConversationStats
from openhands.sdk.conversation.event_store import EventLog
from openhands.sdk.conversation.fifo_lock import FIFOLock
Expand Down Expand Up @@ -205,6 +207,12 @@ class ConversationState(OpenHandsModel):
# ===== Private attrs (NOT Fields) =====
_fs: FileStore = PrivateAttr() # filestore for persistence
_events: EventLog = PrivateAttr() # now the storage for events
# Cached projection of `_events`, lazily updated on read via a
# watermark. Derived state — never persisted, never serialized.
# See https://github.com/OpenHands/software-agent-sdk/issues/3053.
_view: View = PrivateAttr(default_factory=View)
_view_watermark: int = PrivateAttr(default=0)
_view_lock: threading.RLock = PrivateAttr(default_factory=threading.RLock)
Comment thread
csmith49 marked this conversation as resolved.
_cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption
_autosave_enabled: bool = PrivateAttr(
default=False
Expand All @@ -225,6 +233,66 @@ class ConversationState(OpenHandsModel):
def events(self) -> EventLog:
return self._events

@property
def view(self) -> View:
"""Lazily-updated, incrementally-maintained ``View`` of the events.

The view is brought up to date by replaying only the events
appended since the last read (tracked by an internal watermark).
This is O(k) where k is the number of new events — typically 2–4
per agent step — rather than O(n) over the entire history.

``enforce_properties`` is *not* run on the incremental path.
Full enforcement happens only via ``rebuild_view()``, which is
called on cold load, fork, and error recovery.

Callers must treat the returned view as read-only. This
reference is also invalidated by any call to ``rebuild_view()``;
re-read ``state.view`` after any rebuild if you need a fresh
snapshot.
"""
Comment thread
csmith49 marked this conversation as resolved.
Comment thread
csmith49 marked this conversation as resolved.
with self._view_lock:
n = len(self._events)
for i in range(self._view_watermark, n):
try:
self._view.append_event(self._events[i])
self._view_watermark = i + 1
except Exception:
Comment thread
csmith49 marked this conversation as resolved.
logger.warning(
"Incremental view append failed at index %d; "
"rebuilding from scratch.",
i,
exc_info=True,
)
self._view = View.from_events(self._events)
self._view_watermark = len(self._events)
break
return self._view

def rebuild_view(self) -> None:
"""Re-derive the cached view from the full event log.

Runs ``View.from_events`` which applies all view-property
enforcement. This is the recovery / cold-load path described
in ``ViewPropertyBase`` and should be called only on:

- Cold load (resuming a persisted ``ConversationState``).
- Fork creation, after deep-copying events from the source.
- Explicit error recovery (e.g. malformed-history retry).

Any ``View`` reference previously returned by ``state.view``
is invalidated after this call and must not be used — it
will never reflect new events or the rebuilt state.

If ``View.from_events`` raises (e.g. due to corrupted events),
the cache is left unchanged and the exception propagates to
the caller. ``state.view`` continues to serve the pre-rebuild
state until a successful ``rebuild_view()`` call.
"""
with self._view_lock:
self._view = View.from_events(self._events)
self._view_watermark = len(self._events)
Comment thread
csmith49 marked this conversation as resolved.

@property
def env_observation_persistence_dir(self) -> str | None:
"""Directory for persisting environment observation files."""
Expand Down Expand Up @@ -355,6 +423,11 @@ def create(
state._events = EventLog(file_store, dir_path=EVENTS_DIR)
state._cipher = cipher

# Cold-load: rebuild the cached view with full property
# enforcement — persisted events may come from an older code
# version or be corrupted.
state.rebuild_view()

# Verify compatibility (agent class + tools)
agent.verify(state.agent, events=state._events)

Expand Down
58 changes: 58 additions & 0 deletions tests/sdk/agent/test_agent_context_window_condensation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import TYPE_CHECKING
from unittest.mock import patch

import pytest
from pydantic import PrivateAttr
Expand Down Expand Up @@ -52,12 +53,24 @@ def completion(self, *, messages, tools=None, **kwargs): # type: ignore[overrid
"immediately after"
)

async def acompletion(self, *, messages, tools=None, **kwargs): # type: ignore[override]
raise LLMMalformedConversationHistoryError(
"messages.134: `tool_use` ids were found without `tool_result` blocks "
"immediately after"
)

def responses(self, *, messages, tools=None, **kwargs): # type: ignore[override]
raise LLMMalformedConversationHistoryError(
"messages.134: `tool_use` ids were found without `tool_result` blocks "
"immediately after"
)

async def aresponses(self, *, messages, tools=None, **kwargs): # type: ignore[override]
raise LLMMalformedConversationHistoryError(
"messages.134: `tool_use` ids were found without `tool_result` blocks "
"immediately after"
)


class HandlesRequestsCondenser(CondenserBase):
def condense(
Expand Down Expand Up @@ -178,6 +191,51 @@ def test_agent_logs_warning_when_no_condenser_on_ctx_exceeded(
assert any("test-model" in record.message for record in caplog.records)


@pytest.mark.parametrize("force_responses", [True, False])
def test_agent_rebuilds_view_on_malformed_history_recovery(
force_responses: bool,
):
"""rebuild_view is called before CondensationRequest on malformed history."""
llm = MalformedHistoryRaisingLLM(force_responses=force_responses)
agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser())
convo = Conversation(agent=agent)
convo._ensure_agent_ready()

seen: list = []
with patch.object(
type(convo._state),
Comment thread
csmith49 marked this conversation as resolved.
Comment thread
csmith49 marked this conversation as resolved.
"rebuild_view",
wraps=convo._state.rebuild_view,
) as mock_rebuild:
agent.step(convo, on_event=lambda e: seen.append(e))
assert mock_rebuild.call_count == 1

assert any(isinstance(e, CondensationRequest) for e in seen)


@pytest.mark.parametrize("force_responses", [True, False])
@pytest.mark.asyncio
async def test_agent_rebuilds_view_on_malformed_history_recovery_async(
force_responses: bool,
):
"""Async parity: astep calls rebuild_view before condensation retry."""
llm = MalformedHistoryRaisingLLM(force_responses=force_responses)
agent = Agent(llm=llm, tools=[], condenser=HandlesRequestsCondenser())
convo = Conversation(agent=agent)
convo._ensure_agent_ready()

seen: list = []
with patch.object(
type(convo._state),
Comment thread
csmith49 marked this conversation as resolved.
"rebuild_view",
wraps=convo._state.rebuild_view,
) as mock_rebuild:
await agent.astep(convo, on_event=lambda e: seen.append(e))
assert mock_rebuild.call_count == 1

assert any(isinstance(e, CondensationRequest) for e in seen)


class NoHandlesRequestsCondenser(CondenserBase):
"""A condenser that doesn't handle condensation requests."""

Expand Down
Loading
Loading