From cf3e11928cfb7480db9fe8bf045c918e83e3abe6 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 14 Nov 2025 13:26:42 +0100 Subject: [PATCH 1/5] Persist automatically when exiting EventManager context Add test. Remove redundant persist event in BasicCrawler. Add stress test to flaky test --- src/crawlee/crawlers/_basic/_basic_crawler.py | 5 +---- src/crawlee/events/_event_manager.py | 4 +++- tests/unit/crawlers/_basic/test_basic_crawler.py | 4 ++-- tests/unit/events/test_event_manager.py | 12 ++++++++++++ 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index b5e2e5f0aa..74d2aaff13 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -56,7 +56,7 @@ SessionError, UserDefinedErrorHandlerError, ) -from crawlee.events._types import Event, EventCrawlerStatusData, EventPersistStateData +from crawlee.events._types import Event, EventCrawlerStatusData from crawlee.http_clients import ImpitHttpClient from crawlee.router import Router from crawlee.sessions import SessionPool @@ -751,9 +751,6 @@ async def _run_crawler(self) -> None: await self._autoscaled_pool.run() - # Emit PERSIST_STATE event when crawler is finishing to allow listeners to persist their state if needed - event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=False)) - async def add_requests( self, requests: Sequence[str | Request], diff --git a/src/crawlee/events/_event_manager.py b/src/crawlee/events/_event_manager.py index 4d6936e71d..65a41dd0d0 100644 --- a/src/crawlee/events/_event_manager.py +++ b/src/crawlee/events/_event_manager.py @@ -130,11 +130,13 @@ async def __aexit__( if not self._active: raise RuntimeError(f'The {self.__class__.__name__} is not active.') + # Stop persist state event periodic emission and manually emit last one to ensure latest state is saved. + await self._emit_persist_state_event_rec_task.stop() + await self._emit_persist_state_event() await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout) self._event_emitter.remove_all_listeners() self._listener_tasks.clear() self._listeners_to_wrappers.clear() - await self._emit_persist_state_event_rec_task.stop() self._active = False @overload diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 4cbed2c4e2..298a1157d0 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -1673,8 +1673,8 @@ def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsSta return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir)) -@pytest.mark.skip(reason='This test is flaky, see https://github.com/apify/crawlee-python/issues/1560.') -async def test_crawler_statistics_persistence(tmp_path: Path) -> None: +@pytest.mark.parametrize('_', range(100)) +async def test_crawler_statistics_persistence(tmp_path: Path, _: int) -> None: """Test that crawler statistics persist and are loaded correctly. This test simulates starting the crawler process twice, and checks that the statistics include first run.""" diff --git a/tests/unit/events/test_event_manager.py b/tests/unit/events/test_event_manager.py index 05ac60a52c..4654efaf64 100644 --- a/tests/unit/events/test_event_manager.py +++ b/tests/unit/events/test_event_manager.py @@ -5,6 +5,7 @@ from datetime import timedelta from functools import update_wrapper from typing import TYPE_CHECKING, Any +from unittest import mock from unittest.mock import AsyncMock, MagicMock import pytest @@ -207,3 +208,14 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event await event_manager.wait_for_all_listeners_to_complete() assert event_manager.active is True + + +async def test_event_manager_in_context_persistence() -> None: + """Test that entering the `EventManager` context emits persist state event at least once.""" + event_manager = EventManager() + + with mock.patch.object(event_manager, '_emit_persist_state_event', AsyncMock()) as mocked_emit_persist_state_event: + async with event_manager: + pass + + assert mocked_emit_persist_state_event.call_count >= 1 From f7acfc5e2be28f8b9e5509f196c824c777b289b4 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 14 Nov 2025 15:00:20 +0100 Subject: [PATCH 2/5] Change crawler_runtime to computed_field as it really is one --- src/crawlee/statistics/_models.py | 15 ++++++++++++++- src/crawlee/statistics/_statistics.py | 23 ++--------------------- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/src/crawlee/statistics/_models.py b/src/crawlee/statistics/_models.py index 4810b0511b..3c94f7f4f6 100644 --- a/src/crawlee/statistics/_models.py +++ b/src/crawlee/statistics/_models.py @@ -76,7 +76,6 @@ class StatisticsState(BaseModel): crawler_started_at: Annotated[datetime | None, Field(alias='crawlerStartedAt')] = None crawler_last_started_at: Annotated[datetime | None, Field(alias='crawlerLastStartTimestamp')] = None crawler_finished_at: Annotated[datetime | None, Field(alias='crawlerFinishedAt')] = None - crawler_runtime: Annotated[timedelta_ms, Field(alias='crawlerRuntimeMillis')] = timedelta() errors: dict[str, Any] = Field(default_factory=dict) retry_errors: dict[str, Any] = Field(alias='retryErrors', default_factory=dict) requests_with_status_code: dict[str, int] = Field(alias='requestsWithStatusCode', default_factory=dict) @@ -93,6 +92,20 @@ class StatisticsState(BaseModel): ), ] = {} + # Used to track the crawler runtime, that had already been persisted. This is the runtime from previous runs. + _runtime_offset: Annotated[timedelta, Field(exclude=True)] = timedelta() + + def model_post_init(self, /, __context: Any) -> None: + self._runtime_offset = self.crawler_runtime or self._runtime_offset + + @computed_field(alias='crawlerRuntimeMillis') # type: ignore[prop-decorator] + @property + def crawler_runtime(self) -> timedelta: + if self.crawler_last_started_at: + finished_at = self.crawler_finished_at or datetime.now(timezone.utc) + return self._runtime_offset + finished_at - self.crawler_last_started_at + return self._runtime_offset + @computed_field(alias='requestTotalDurationMillis', return_type=timedelta_ms) # type: ignore[prop-decorator] @property def request_total_duration(self) -> timedelta: diff --git a/src/crawlee/statistics/_statistics.py b/src/crawlee/statistics/_statistics.py index 667b96eebe..51735b0056 100644 --- a/src/crawlee/statistics/_statistics.py +++ b/src/crawlee/statistics/_statistics.py @@ -110,9 +110,6 @@ def __init__( # Flag to indicate the context state. self._active = False - # Pre-existing runtime offset, that can be non-zero when restoring serialized state from KVS. - self._runtime_offset = timedelta(seconds=0) - def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]: """Create near copy of the `Statistics` with replaced `state_model`.""" new_statistics: Statistics[TNewStatisticsState] = Statistics( @@ -168,8 +165,8 @@ async def __aenter__(self) -> Self: raise RuntimeError(f'The {self.__class__.__name__} is already active.') await self._state.initialize() - - self._runtime_offset = self.state.crawler_runtime + # Reset `crawler_finished_at` to indicate a new run in progress. + self.state.crawler_finished_at = None # Start periodic logging and let it print initial state before activation. self._periodic_logger.start() @@ -200,10 +197,6 @@ async def __aexit__( # Stop logging and deactivate the statistics to prevent further changes to crawler_runtime await self._periodic_logger.stop() self.state.crawler_finished_at = datetime.now(timezone.utc) - self.state.crawler_runtime = ( - self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at - ) - self._active = False await self._state.teardown() @@ -262,20 +255,8 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None: del self._requests_in_progress[request_id_or_key] - def _update_crawler_runtime(self) -> None: - current_run_duration = ( - (datetime.now(timezone.utc) - self.state.crawler_last_started_at) - if self.state.crawler_last_started_at - else timedelta() - ) - self.state.crawler_runtime = current_run_duration + self._runtime_offset - def calculate(self) -> FinalStatistics: """Calculate the current statistics.""" - if self._active: - # Only update state when active. If not, just report the last known runtime. - self._update_crawler_runtime() - total_minutes = self.state.crawler_runtime.total_seconds() / 60 state = self._state.current_value serialized_state = state.model_dump(by_alias=False) From 4bf2bf133402721a32d7377cae9edddb4165e2e1 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 14 Nov 2025 15:19:30 +0100 Subject: [PATCH 3/5] Make it non-breaking change --- src/crawlee/statistics/_models.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/crawlee/statistics/_models.py b/src/crawlee/statistics/_models.py index 3c94f7f4f6..405717ef2a 100644 --- a/src/crawlee/statistics/_models.py +++ b/src/crawlee/statistics/_models.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import warnings from dataclasses import asdict, dataclass from datetime import datetime, timedelta, timezone from typing import Annotated, Any @@ -98,7 +99,6 @@ class StatisticsState(BaseModel): def model_post_init(self, /, __context: Any) -> None: self._runtime_offset = self.crawler_runtime or self._runtime_offset - @computed_field(alias='crawlerRuntimeMillis') # type: ignore[prop-decorator] @property def crawler_runtime(self) -> timedelta: if self.crawler_last_started_at: @@ -106,6 +106,23 @@ def crawler_runtime(self) -> timedelta: return self._runtime_offset + finished_at - self.crawler_last_started_at return self._runtime_offset + @crawler_runtime.setter + def crawler_runtime(self, value: timedelta) -> None: + # Setter for backwards compatibility only, the crawler_runtime is now computed_field, and cant be set manually. + warnings.warn( + f"Setting 'crawler_runtime' is deprecated and will be removed in a future version." + f' Value {value} will not be used.', + DeprecationWarning, + stacklevel=2, + ) + + @computed_field(alias='crawlerRuntimeMillis') + def crawler_runtime_for_serialization(self) -> timedelta: + if self.crawler_last_started_at: + finished_at = self.crawler_finished_at or datetime.now(timezone.utc) + return self._runtime_offset + finished_at - self.crawler_last_started_at + return self._runtime_offset + @computed_field(alias='requestTotalDurationMillis', return_type=timedelta_ms) # type: ignore[prop-decorator] @property def request_total_duration(self) -> timedelta: From 596991030ce0b5789aa8f926cc8540de8b3765af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20Proch=C3=A1zka?= Date: Fri, 14 Nov 2025 15:35:25 +0100 Subject: [PATCH 4/5] Remove stress testing --- tests/unit/crawlers/_basic/test_basic_crawler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 298a1157d0..c7dad2725c 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -1673,8 +1673,7 @@ def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsSta return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir)) -@pytest.mark.parametrize('_', range(100)) -async def test_crawler_statistics_persistence(tmp_path: Path, _: int) -> None: +async def test_crawler_statistics_persistence(tmp_path: Path) -> None: """Test that crawler statistics persist and are loaded correctly. This test simulates starting the crawler process twice, and checks that the statistics include first run.""" From 971938159f65c975b9619e859f9e0dd1baac0f10 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 18 Nov 2025 09:54:47 +0100 Subject: [PATCH 5/5] Add link to the issue about removing deprecated method --- src/crawlee/statistics/_models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/crawlee/statistics/_models.py b/src/crawlee/statistics/_models.py index 405717ef2a..11b4310f3a 100644 --- a/src/crawlee/statistics/_models.py +++ b/src/crawlee/statistics/_models.py @@ -109,6 +109,7 @@ def crawler_runtime(self) -> timedelta: @crawler_runtime.setter def crawler_runtime(self, value: timedelta) -> None: # Setter for backwards compatibility only, the crawler_runtime is now computed_field, and cant be set manually. + # To be removed in v2 release https://github.com/apify/crawlee-python/issues/1567 warnings.warn( f"Setting 'crawler_runtime' is deprecated and will be removed in a future version." f' Value {value} will not be used.',