Skip to content
Closed
11 changes: 11 additions & 0 deletions openhands-sdk/openhands/sdk/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,13 @@ def step(
"triggering condensation retry with condensed history: "
f"{e}"
)
# The incremental view we just sent may itself be the source of
# the malformed history (e.g., a manipulation-indices bug that
# let a property slip). Re-derive it from the event log with
# full `enforce_properties` before kicking off the condensation
# retry, so the condenser operates on a clean, enforced view.
# See https://github.com/OpenHands/software-agent-sdk/issues/3053.
state.rebuild_view()
Comment thread
csmith49 marked this conversation as resolved.
on_event(CondensationRequest())
return
logger.warning(
Expand Down Expand Up @@ -769,6 +776,10 @@ async def astep(
"history: %s",
e,
)
# Re-derive the cached view with full enforce_properties
# before the condensation retry — mirrors step(). See
# https://github.com/OpenHands/software-agent-sdk/issues/3053.
state.rebuild_view()
on_event(CondensationRequest())
return
logger.warning(
Expand Down
6 changes: 6 additions & 0 deletions openhands-sdk/openhands/sdk/context/condenser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def condense(self, view: View, agent_llm: LLM | None = None) -> View | Condensat

Args:
view: A view of the history containing all events that should be condensed.
**Implementations must treat this view as read-only.** The view may be
a cached projection owned by `ConversationState`
(see https://github.com/OpenHands/software-agent-sdk/issues/3053), and
mutating it in place will corrupt that cache and silently desynchronize
the conversation from its event log. Return a new `View` (e.g.
`View(events=view.events[k:])`) or a `Condensation` event instead.
agent_llm: LLM instance used by the agent. Condensers use this for token
counting purposes. Defaults to None.

Expand Down
31 changes: 31 additions & 0 deletions openhands-sdk/openhands/sdk/conversation/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class EventLog(EventsListBase):
_length: int
_lock_path: str
_write_guard: Callable[[], AbstractContextManager[None]] | None
_on_append: Callable[[Event, int], None] | None

def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None:
self._fs = fs
Expand All @@ -48,6 +49,7 @@ def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None:
self._idx_to_id: dict[int, EventID] = {}
self._lock_path = f"{dir_path}/{LOCK_FILE_NAME}"
self._write_guard = None
self._on_append = None
self._length = self._scan_and_build_index()

def set_write_guard(
Expand All @@ -56,6 +58,27 @@ def set_write_guard(
) -> None:
self._write_guard = write_guard

def set_on_append(
self,
on_append: Callable[[Event, int], None] | None,
) -> None:
"""Set a callback invoked after every successful ``append``.

The callback receives ``(event, synced_count)`` where
``synced_count`` is the number of additional events that were
synced from disk inside this append (non-zero only when another
process wrote while this process was alive).

The callback is invoked inside the ``EventLog`` file lock, so
concurrent appends always execute their callbacks in the same
order as their writes. However, if the callback mutates shared
in-memory state (e.g. a cached ``View``), callers must still
hold an appropriate application-level lock (such as the
``ConversationState`` lock) to prevent races with non-append
readers or writers on that shared state.
"""
self._on_append = on_append

def get_index(self, event_id: EventID) -> int:
"""Return the integer index for a given event_id."""
try:
Expand Down Expand Up @@ -129,7 +152,9 @@ def append(self, event: Event) -> None:
with self._fs.lock(self._lock_path, timeout=LOCK_TIMEOUT_SECONDS):
# Sync with disk in case another process wrote while we waited
disk_length = self._count_events_on_disk()
synced_count = 0
if disk_length > self._length:
synced_count = disk_length - self._length
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: synced_count is based on _count_events_on_disk(), but that helper counts any event-*.json file while _sync_from_disk() only indexes contiguous files matching EVENT_NAME_RE. A stale/corrupt file or index gap can therefore report missed events that are not actually readable; _wire_view_sync() then indexes those missing entries after the new event has already been written, so append() raises and leaves the persisted log and cached view inconsistent. Please derive synced_count from the actual indexed delta after syncing (and avoid inflating _length from the raw file count), or pass the synced events/indices explicitly.

self._sync_from_disk(disk_length)

if evt_id in self._id_to_idx:
Expand All @@ -149,6 +174,12 @@ def append(self, event: Event) -> None:
self._idx_to_id[self._length] = evt_id
self._id_to_idx[evt_id] = self._length
self._length += 1

# Fire the callback inside the file lock so that concurrent
# appends execute their callbacks in the same order as their
# writes, preventing view divergence from the persisted log.
if self._on_append is not None:
self._on_append(event, synced_count)
except TimeoutError:
logger.error(
f"Failed to acquire EventLog lock within {LOCK_TIMEOUT_SECONDS}s "
Expand Down
50 changes: 38 additions & 12 deletions openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ def _default_callback(e):
# This callback runs while holding the conversation state's lock
# (see BaseConversation.compose_callbacks usage inside `with self._state:`
# regions), so updating state here is thread-safe.
self._state.events.append(e)
# `append_event` persists via EventLog.append, whose `_on_append`
# callback incrementally updates the cached `state.view`, keeping
# the view in sync without paying the O(n) `View.from_events`
# cost on each step.
self._state.append_event(e)
Comment thread
csmith49 marked this conversation as resolved.
Comment thread
csmith49 marked this conversation as resolved.
# Track user MessageEvent IDs here so hook callbacks (which may
# synthesize or alter user messages) are captured in one place.
if isinstance(e, MessageEvent) and e.source == "user":
Expand Down Expand Up @@ -403,10 +407,21 @@ def fork(
tags=tags,
)

# Deep-copy events from source → fork so the source stays
# immutable.
for event in self._state.events:
fork_conv._state.events.append(event.model_copy(deep=True))
# Deep-copy events from source → fork. Suppress the on_append
# callback during the bulk copy: rebuild_view() at the end will
# derive the view from scratch, so paying n incremental view
# updates only to discard them is pure overhead.
# Nested try/finally ensures _wire_view_sync() restores the
# callback even if rebuild_view() itself raises.
fork_conv._state._events.set_on_append(None)
try:
for event in self._state.events:
fork_conv._state._events.append(event.model_copy(deep=True))
finally:
try:
fork_conv._state.rebuild_view()
finally:
fork_conv._state._wire_view_sync()

# Copy runtime state that accumulated during the source
# conversation. activated_knowledge_skills is list[str] – strings
Expand Down Expand Up @@ -895,6 +910,9 @@ def run(self) -> None:
)
break
except Exception as e:
# Hold the state lock while mutating execution_status and
# emitting the error event so the view update in _on_event
# cannot race with a concurrent send_message()/run().
with self._state:
self._state.execution_status = ConversationExecutionStatus.ERROR

Expand Down Expand Up @@ -1229,7 +1247,16 @@ def close(self) -> None:
logger.debug("Closing conversation and cleaning up tool executors")
hook_processor = getattr(self, "_hook_processor", None)
if hook_processor is not None:
hook_processor.run_session_end()
# Session-end hooks emit events through the callback chain,
# which reaches _default_callback → append_event → view
# mutation. Hold the state lock so this cannot race with a
# concurrent run()/send_message().
state = getattr(self, "_state", None)
if state is not None:
with state:
hook_processor.run_session_end()
else:
hook_processor.run_session_end()
try:
self._end_observability_span()
except AttributeError:
Expand Down Expand Up @@ -1388,14 +1415,13 @@ def condense(self) -> None:
")"
)

# Add a condensation request event
# Force the agent to take a single step to process the condensation request.
# Both the request event emission and the step must be under the state
# lock so that the view mutation in _on_event cannot race with a
# concurrent run().
condensation_request = CondensationRequest()
self._on_event(condensation_request)

# Force the agent to take a single step to process the condensation request
# This will trigger the condenser if it handles condensation requests
with self._state:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: This lock does not make condense() wait for an in-flight arun() on the same event-loop thread. FIFOLock is reentrant by thread, and arun() holds it across await self.agent.astep(...), so another asyncio task can re-enter here, append a CondensationRequest, and run a synchronous agent.step() while the async step is still awaiting. That violates the method's wait guarantee and can order condensation events ahead of the in-flight step's response in both the event log and cached view. Please guard/coordinate with _arun_task before emitting the request (reject, interrupt/wait, or provide a task-aware async path).

# Take a single step to process the condensation request
self._on_event(condensation_request)
self.agent.step(self, on_event=self._on_event, on_token=self._on_token)

logger.info("Condensation request processed")
Expand Down
122 changes: 122 additions & 0 deletions openhands-sdk/openhands/sdk/conversation/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pydantic import Field, PrivateAttr

from openhands.sdk.agent.base import AgentBase
from openhands.sdk.context.view import View
from openhands.sdk.conversation.conversation_stats import ConversationStats
from openhands.sdk.conversation.event_store import EventLog
from openhands.sdk.conversation.fifo_lock import FIFOLock
Expand Down Expand Up @@ -205,6 +206,13 @@ class ConversationState(OpenHandsModel):
# ===== Private attrs (NOT Fields) =====
_fs: FileStore = PrivateAttr() # filestore for persistence
_events: EventLog = PrivateAttr() # now the storage for events
# Cached, incrementally-maintained projection of `_events`. Derived state,
# so it is intentionally not a model field and never persisted. Kept in
# sync with the event log via the `_on_append` callback wired by
# `_wire_view_sync`, and fully re-derived (with property enforcement)
# via `rebuild_view` on cold load, fork, or error recovery.
# See https://github.com/OpenHands/software-agent-sdk/issues/3053.
_view: View = PrivateAttr(default_factory=View)
_cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption
_autosave_enabled: bool = PrivateAttr(
default=False
Expand All @@ -225,6 +233,95 @@ class ConversationState(OpenHandsModel):
def events(self) -> EventLog:
return self._events

@property
def view(self) -> View:
"""Cached, incrementally-maintained `View` of the conversation events.

The view is kept in sync with the event log automatically: every
``EventLog.append`` triggers an ``_on_append`` callback that
performs either an O(1) incremental update or, when the log
synced extra events from disk, a full ``View.from_events``
rebuild. The view is also fully re-derived (with property
enforcement) by ``rebuild_view``, intended for cold load, fork,
and explicit error recovery.

Callers must treat the returned view as read-only. Mutating it
breaks the cache invariant and will cause silent divergence from
``events``.
"""
return self._view

def append_event(self, event: Event) -> None:
"""Append an event to the conversation, updating the cached view.

This is the preferred write path for adding events to a running
conversation. It persists the event via ``EventLog.append``,
which in turn fires the ``_on_append`` callback to
incrementally update the cached ``View``.

The incremental view update is O(1) for the common
``LLMConvertibleEvent`` case and O(view-size) only when a
``Condensation`` event is applied. It does not run
``enforce_properties``; that fallback path runs only via
``rebuild_view``.

The view cache is also maintained when events are appended
directly via ``state.events.append(event)`` (e.g. by external
callers), because the same ``_on_append`` callback is wired
to ``EventLog``.
"""
self._events.append(event)
Comment thread
csmith49 marked this conversation as resolved.

def rebuild_view(self) -> None:
"""Re-derive the cached view from the full event log.

Runs `View.from_events` over the persisted log, which applies all
view property enforcement. This is the fallback path described in
`ViewPropertyBase` and should be used only on:

- Cold load (resuming a persisted `ConversationState`), since the
on-disk events may have been written by an older version or
could be corrupted.
- Fork creation, after deep-copying events from the source state.
- Explicit error recovery (e.g., when the LLM rejects the current
message history as malformed and we want to re-validate the
view before retrying with condensation).
"""
self._view = View.from_events(self._events)

def _wire_view_sync(self) -> None:
"""Wire the ``EventLog._on_append`` callback to keep ``_view`` current.

Must be called after ``_events`` is assigned. Subsequent calls to
``EventLog.append`` — whether via ``self.append_event`` or a
direct ``self.events.append`` — will automatically update the
cached ``_view``.

Thread-safety note: the callback executes inside the ``EventLog``
file lock, which serializes writes. Callers of
``EventLog.append`` must additionally hold the
``ConversationState`` lock (``with self._state:``) to prevent
races between the ``_view`` mutation here and non-append readers
or writers. All production call sites in ``LocalConversation``
Comment thread
csmith49 marked this conversation as resolved.
satisfy this — see ``_default_callback``, ``fork``,
``condense``, ``run`` (error handler), and ``close``
(session-end hooks).
"""

def _sync(event: Event, synced_count: int) -> None:
if synced_count > 0:
# Another writer added events between our last append and
# this one. Apply only the missed events incrementally
# rather than rebuilding the entire view, so the lock
# hold stays O(synced_count) instead of O(N).
new_len = len(self._events)
start = new_len - synced_count - 1
for i in range(start, new_len - 1):
self._view.append_event(self._events[i])
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: self._events[i] is called here while the EventLog file lock is held (the _on_append callback fires inside with self._fs.lock(...)). This is safe today because EventLog._get_single_item reads from the FileStore directly without re-acquiring the lock. But the safety is an implicit structural assumption — a future maintainer who adds locking inside __getitem__ (e.g., for a thread-safe read variant) would silently introduce a deadlock.

A short inline comment would make this assumption explicit:

Suggested change
self._view.append_event(self._events[i])
# Safe inside the EventLog file lock: _get_single_item
# reads via FileStore directly, no re-entrant lock.
self._view.append_event(self._events[i])

self._view.append_event(event)

self._events.set_on_append(_sync)

@property
def env_observation_persistence_dir(self) -> str | None:
"""Directory for persisting environment observation files."""
Expand Down Expand Up @@ -355,6 +452,14 @@ def create(
state._events = EventLog(file_store, dir_path=EVENTS_DIR)
state._cipher = cipher

# Build the cached view from the persisted event log. This is the
# one place where we pay the full `View.from_events` cost (which
# runs property enforcement) — events may have been written by an
# older version of the code or otherwise be in a bad state, so
# treating this as a cold-load checkpoint is appropriate.
state.rebuild_view()
state._wire_view_sync()

# Verify compatibility (agent class + tools)
agent.verify(state.agent, events=state._events)

Expand Down Expand Up @@ -387,6 +492,23 @@ def create(
)
state._fs = file_store
state._events = EventLog(file_store, dir_path=EVENTS_DIR)
state._wire_view_sync()
Comment thread
csmith49 marked this conversation as resolved.
# Guard against orphaned event files from a previous incomplete
# cleanup or crash: base_state.json is missing (fresh path) but
# events/ may already contain files. Without this rebuild the
# cached view would start empty while the log is non-empty, and
# future appends would never pick up the pre-existing events.
# If the orphaned files are corrupted we fall back to an empty
# view — they will still surface errors on explicit iteration.
if len(state._events) > 0:
try:
state.rebuild_view()
except Exception:
logger.warning(
"Failed to rebuild view from orphaned events for "
"conversation %s; starting with empty view",
state.id,
)
Comment on lines +495 to +511
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: The fresh-create path wires the callback before building from orphaned events (wire → rebuild), while the resume path above does the opposite (rebuild → wire). Both are correct — no concurrent access can occur during construction — but the inconsistency can mislead a future reader.

Moving _wire_view_sync() to after the orphan-rebuild block would match the resume path's "build then wire" discipline:

Suggested change
state._wire_view_sync()
# Guard against orphaned event files from a previous incomplete
# cleanup or crash: base_state.json is missing (fresh path) but
# events/ may already contain files. Without this rebuild the
# cached view would start empty while the log is non-empty, and
# future appends would never pick up the pre-existing events.
# If the orphaned files are corrupted we fall back to an empty
# view — they will still surface errors on explicit iteration.
if len(state._events) > 0:
try:
state.rebuild_view()
except Exception:
logger.warning(
"Failed to rebuild view from orphaned events for "
"conversation %s; starting with empty view",
state.id,
)
# Guard against orphaned event files from a previous incomplete
# cleanup or crash: base_state.json is missing (fresh path) but
# events/ may already contain files. Without this rebuild the
# cached view would start empty while the log is non-empty, and
# future appends would never pick up the pre-existing events.
# If the orphaned files are corrupted we fall back to an empty
# view — they will still surface errors on explicit iteration.
if len(state._events) > 0:
try:
state.rebuild_view()
except Exception:
logger.warning(
"Failed to rebuild view from orphaned events for "
"conversation %s; starting with empty view",
state.id,
)
state._wire_view_sync()

state._cipher = cipher
state.stats = ConversationStats()

Expand Down
Loading
Loading