-
Notifications
You must be signed in to change notification settings - Fork 261
Cache View on ConversationState, update incrementally #3324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c9b7e77
0abd27c
566db5d
ce1bc9c
65f3f99
42f382a
4ade933
4410b99
1e1d4f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,7 @@ class EventLog(EventsListBase): | |
| _length: int | ||
| _lock_path: str | ||
| _write_guard: Callable[[], AbstractContextManager[None]] | None | ||
| _on_append: Callable[[Event, int], None] | None | ||
|
|
||
| def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None: | ||
| self._fs = fs | ||
|
|
@@ -48,6 +49,7 @@ def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None: | |
| self._idx_to_id: dict[int, EventID] = {} | ||
| self._lock_path = f"{dir_path}/{LOCK_FILE_NAME}" | ||
| self._write_guard = None | ||
| self._on_append = None | ||
| self._length = self._scan_and_build_index() | ||
|
|
||
| def set_write_guard( | ||
|
|
@@ -56,6 +58,27 @@ def set_write_guard( | |
| ) -> None: | ||
| self._write_guard = write_guard | ||
|
|
||
| def set_on_append( | ||
| self, | ||
| on_append: Callable[[Event, int], None] | None, | ||
| ) -> None: | ||
| """Set a callback invoked after every successful ``append``. | ||
|
|
||
| The callback receives ``(event, synced_count)`` where | ||
| ``synced_count`` is the number of additional events that were | ||
| synced from disk inside this append (non-zero only when another | ||
| process wrote while this process was alive). | ||
|
|
||
| The callback is invoked inside the ``EventLog`` file lock, so | ||
| concurrent appends always execute their callbacks in the same | ||
| order as their writes. However, if the callback mutates shared | ||
| in-memory state (e.g. a cached ``View``), callers must still | ||
| hold an appropriate application-level lock (such as the | ||
| ``ConversationState`` lock) to prevent races with non-append | ||
| readers or writers on that shared state. | ||
| """ | ||
| self._on_append = on_append | ||
|
|
||
| def get_index(self, event_id: EventID) -> int: | ||
| """Return the integer index for a given event_id.""" | ||
| try: | ||
|
|
@@ -129,7 +152,9 @@ def append(self, event: Event) -> None: | |
| with self._fs.lock(self._lock_path, timeout=LOCK_TIMEOUT_SECONDS): | ||
| # Sync with disk in case another process wrote while we waited | ||
| disk_length = self._count_events_on_disk() | ||
| synced_count = 0 | ||
| if disk_length > self._length: | ||
| synced_count = disk_length - self._length | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 Important: |
||
| self._sync_from_disk(disk_length) | ||
|
|
||
| if evt_id in self._id_to_idx: | ||
|
|
@@ -149,6 +174,12 @@ def append(self, event: Event) -> None: | |
| self._idx_to_id[self._length] = evt_id | ||
| self._id_to_idx[evt_id] = self._length | ||
| self._length += 1 | ||
|
|
||
| # Fire the callback inside the file lock so that concurrent | ||
| # appends execute their callbacks in the same order as their | ||
| # writes, preventing view divergence from the persisted log. | ||
| if self._on_append is not None: | ||
| self._on_append(event, synced_count) | ||
| except TimeoutError: | ||
| logger.error( | ||
| f"Failed to acquire EventLog lock within {LOCK_TIMEOUT_SECONDS}s " | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -206,7 +206,11 @@ def _default_callback(e): | |
| # This callback runs while holding the conversation state's lock | ||
| # (see BaseConversation.compose_callbacks usage inside `with self._state:` | ||
| # regions), so updating state here is thread-safe. | ||
| self._state.events.append(e) | ||
| # `append_event` persists via EventLog.append, whose `_on_append` | ||
| # callback incrementally updates the cached `state.view`, keeping | ||
| # the view in sync without paying the O(n) `View.from_events` | ||
| # cost on each step. | ||
| self._state.append_event(e) | ||
|
csmith49 marked this conversation as resolved.
csmith49 marked this conversation as resolved.
|
||
| # Track user MessageEvent IDs here so hook callbacks (which may | ||
| # synthesize or alter user messages) are captured in one place. | ||
| if isinstance(e, MessageEvent) and e.source == "user": | ||
|
|
@@ -403,10 +407,21 @@ def fork( | |
| tags=tags, | ||
| ) | ||
|
|
||
| # Deep-copy events from source → fork so the source stays | ||
| # immutable. | ||
| for event in self._state.events: | ||
| fork_conv._state.events.append(event.model_copy(deep=True)) | ||
| # Deep-copy events from source → fork. Suppress the on_append | ||
| # callback during the bulk copy: rebuild_view() at the end will | ||
| # derive the view from scratch, so paying n incremental view | ||
| # updates only to discard them is pure overhead. | ||
| # Nested try/finally ensures _wire_view_sync() restores the | ||
| # callback even if rebuild_view() itself raises. | ||
| fork_conv._state._events.set_on_append(None) | ||
| try: | ||
| for event in self._state.events: | ||
| fork_conv._state._events.append(event.model_copy(deep=True)) | ||
| finally: | ||
| try: | ||
| fork_conv._state.rebuild_view() | ||
| finally: | ||
| fork_conv._state._wire_view_sync() | ||
|
|
||
| # Copy runtime state that accumulated during the source | ||
| # conversation. activated_knowledge_skills is list[str] – strings | ||
|
|
@@ -895,6 +910,9 @@ def run(self) -> None: | |
| ) | ||
| break | ||
| except Exception as e: | ||
| # Hold the state lock while mutating execution_status and | ||
| # emitting the error event so the view update in _on_event | ||
| # cannot race with a concurrent send_message()/run(). | ||
| with self._state: | ||
| self._state.execution_status = ConversationExecutionStatus.ERROR | ||
|
|
||
|
|
@@ -1229,7 +1247,16 @@ def close(self) -> None: | |
| logger.debug("Closing conversation and cleaning up tool executors") | ||
| hook_processor = getattr(self, "_hook_processor", None) | ||
| if hook_processor is not None: | ||
| hook_processor.run_session_end() | ||
| # Session-end hooks emit events through the callback chain, | ||
| # which reaches _default_callback → append_event → view | ||
| # mutation. Hold the state lock so this cannot race with a | ||
| # concurrent run()/send_message(). | ||
| state = getattr(self, "_state", None) | ||
| if state is not None: | ||
| with state: | ||
| hook_processor.run_session_end() | ||
| else: | ||
| hook_processor.run_session_end() | ||
| try: | ||
| self._end_observability_span() | ||
| except AttributeError: | ||
|
|
@@ -1388,14 +1415,13 @@ def condense(self) -> None: | |
| ")" | ||
| ) | ||
|
|
||
| # Add a condensation request event | ||
| # Force the agent to take a single step to process the condensation request. | ||
| # Both the request event emission and the step must be under the state | ||
| # lock so that the view mutation in _on_event cannot race with a | ||
| # concurrent run(). | ||
| condensation_request = CondensationRequest() | ||
| self._on_event(condensation_request) | ||
|
|
||
| # Force the agent to take a single step to process the condensation request | ||
| # This will trigger the condenser if it handles condensation requests | ||
| with self._state: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 Important: This lock does not make |
||
| # Take a single step to process the condensation request | ||
| self._on_event(condensation_request) | ||
| self.agent.step(self, on_event=self._on_event, on_token=self._on_token) | ||
|
|
||
| logger.info("Condensation request processed") | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pydantic import Field, PrivateAttr | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from openhands.sdk.agent.base import AgentBase | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from openhands.sdk.context.view import View | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from openhands.sdk.conversation.conversation_stats import ConversationStats | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from openhands.sdk.conversation.event_store import EventLog | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from openhands.sdk.conversation.fifo_lock import FIFOLock | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -205,6 +206,13 @@ class ConversationState(OpenHandsModel): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # ===== Private attrs (NOT Fields) ===== | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _fs: FileStore = PrivateAttr() # filestore for persistence | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _events: EventLog = PrivateAttr() # now the storage for events | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Cached, incrementally-maintained projection of `_events`. Derived state, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # so it is intentionally not a model field and never persisted. Kept in | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # sync with the event log via the `_on_append` callback wired by | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # `_wire_view_sync`, and fully re-derived (with property enforcement) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # via `rebuild_view` on cold load, fork, or error recovery. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # See https://github.com/OpenHands/software-agent-sdk/issues/3053. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _view: View = PrivateAttr(default_factory=View) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _cipher: Cipher | None = PrivateAttr(default=None) # cipher for secret encryption | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _autosave_enabled: bool = PrivateAttr( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default=False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -225,6 +233,95 @@ class ConversationState(OpenHandsModel): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def events(self) -> EventLog: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self._events | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def view(self) -> View: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Cached, incrementally-maintained `View` of the conversation events. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| The view is kept in sync with the event log automatically: every | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``EventLog.append`` triggers an ``_on_append`` callback that | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| performs either an O(1) incremental update or, when the log | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| synced extra events from disk, a full ``View.from_events`` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| rebuild. The view is also fully re-derived (with property | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| enforcement) by ``rebuild_view``, intended for cold load, fork, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| and explicit error recovery. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Callers must treat the returned view as read-only. Mutating it | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| breaks the cache invariant and will cause silent divergence from | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``events``. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self._view | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def append_event(self, event: Event) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Append an event to the conversation, updating the cached view. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This is the preferred write path for adding events to a running | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| conversation. It persists the event via ``EventLog.append``, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| which in turn fires the ``_on_append`` callback to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| incrementally update the cached ``View``. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| The incremental view update is O(1) for the common | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``LLMConvertibleEvent`` case and O(view-size) only when a | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``Condensation`` event is applied. It does not run | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``enforce_properties``; that fallback path runs only via | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``rebuild_view``. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| The view cache is also maintained when events are appended | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| directly via ``state.events.append(event)`` (e.g. by external | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| callers), because the same ``_on_append`` callback is wired | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| to ``EventLog``. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._events.append(event) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
csmith49 marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def rebuild_view(self) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Re-derive the cached view from the full event log. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Runs `View.from_events` over the persisted log, which applies all | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| view property enforcement. This is the fallback path described in | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| `ViewPropertyBase` and should be used only on: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - Cold load (resuming a persisted `ConversationState`), since the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| on-disk events may have been written by an older version or | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| could be corrupted. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - Fork creation, after deep-copying events from the source state. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - Explicit error recovery (e.g., when the LLM rejects the current | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| message history as malformed and we want to re-validate the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| view before retrying with condensation). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._view = View.from_events(self._events) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _wire_view_sync(self) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Wire the ``EventLog._on_append`` callback to keep ``_view`` current. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Must be called after ``_events`` is assigned. Subsequent calls to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``EventLog.append`` — whether via ``self.append_event`` or a | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| direct ``self.events.append`` — will automatically update the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cached ``_view``. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Thread-safety note: the callback executes inside the ``EventLog`` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| file lock, which serializes writes. Callers of | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``EventLog.append`` must additionally hold the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``ConversationState`` lock (``with self._state:``) to prevent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| races between the ``_view`` mutation here and non-append readers | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| or writers. All production call sites in ``LocalConversation`` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
csmith49 marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| satisfy this — see ``_default_callback``, ``fork``, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``condense``, ``run`` (error handler), and ``close`` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (session-end hooks). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _sync(event: Event, synced_count: int) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if synced_count > 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Another writer added events between our last append and | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # this one. Apply only the missed events incrementally | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # rather than rebuilding the entire view, so the lock | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # hold stays O(synced_count) instead of O(N). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new_len = len(self._events) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start = new_len - synced_count - 1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for i in range(start, new_len - 1): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._view.append_event(self._events[i]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Suggestion: A short inline comment would make this assumption explicit:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._view.append_event(event) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._events.set_on_append(_sync) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def env_observation_persistence_dir(self) -> str | None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Directory for persisting environment observation files.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -355,6 +452,14 @@ def create( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state._events = EventLog(file_store, dir_path=EVENTS_DIR) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state._cipher = cipher | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Build the cached view from the persisted event log. This is the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # one place where we pay the full `View.from_events` cost (which | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # runs property enforcement) — events may have been written by an | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # older version of the code or otherwise be in a bad state, so | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # treating this as a cold-load checkpoint is appropriate. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state.rebuild_view() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state._wire_view_sync() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Verify compatibility (agent class + tools) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| agent.verify(state.agent, events=state._events) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -387,6 +492,23 @@ def create( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state._fs = file_store | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state._events = EventLog(file_store, dir_path=EVENTS_DIR) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state._wire_view_sync() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
csmith49 marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Guard against orphaned event files from a previous incomplete | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # cleanup or crash: base_state.json is missing (fresh path) but | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # events/ may already contain files. Without this rebuild the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # cached view would start empty while the log is non-empty, and | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # future appends would never pick up the pre-existing events. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # If the orphaned files are corrupted we fall back to an empty | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # view — they will still surface errors on explicit iteration. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if len(state._events) > 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state.rebuild_view() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Failed to rebuild view from orphaned events for " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "conversation %s; starting with empty view", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+495
to
+511
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Suggestion: The fresh-create path wires the callback before building from orphaned events (wire → rebuild), while the resume path above does the opposite (rebuild → wire). Both are correct — no concurrent access can occur during construction — but the inconsistency can mislead a future reader. Moving
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state._cipher = cipher | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| state.stats = ConversationStats() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.