Skip to content
Merged
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ keywords = [
"scraping",
]
dependencies = [
"async-timeout>=5.0.1",
"cachetools>=5.5.0",
"colorama>=0.4.0",
"impit>=0.8.0",
Expand Down
42 changes: 41 additions & 1 deletion src/crawlee/_utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import time
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import timedelta
from typing import TYPE_CHECKING

from async_timeout import Timeout, timeout

if TYPE_CHECKING:
from collections.abc import Iterator
from datetime import timedelta
from types import TracebackType

_SECONDS_PER_MINUTE = 60
_SECONDS_PER_HOUR = 3600
Expand Down Expand Up @@ -35,6 +38,43 @@ def measure_time() -> Iterator[TimerResult]:
result.cpu = after_cpu - before_cpu


class SharedTimeout:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea. I like this a lot.

Just thinking out loud, where this could lead to:
In case we need to create more granular timeouts for specific steps, I think this class could be easily expanded to support that. I am wondering if even the final context consumer (request handler) could just use timeout from here if the timeout is set on the context (my other comment)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, do you think that the request handler should be limited by a shared timeout? Or that it should have access to the remaining timeout "budget"?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not focus on any specific interface example in my example. It is just about capability.

Imagine that the context would be created something like this:

pipeline_timeout = SharedTimeout(...)

BasicCrawlingContext(.....,
 timeouts={
"WholePipeline": pipeline_timeout,   # Maybe the other timeouts could be somehow limited by this one? 
"Navigation": pipeline_timeout.limited_to(NAVIGATION_LIMIT),
"RequestHandler": pipeline_timeout.limited_to(HANDLER_LIMIT)
})

And each timeout-protected context consumer would pick the relevant timeout from the context and apply it. Context consumers could even modify the timeouts of the steps that follow them.

For example, users could mutate "RequestHandler" timeout in pre-navigation hooks.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to block this change for this. If needed, we can discuss here: #1596

"""Keeps track of a time budget shared by multiple independent async operations.

Provides a reusable, non-reentrant context manager interface.
"""

def __init__(self, timeout: timedelta) -> None:
self._remaining_timeout = timeout
self._active_timeout: Timeout | None = None
self._activation_timestamp: float | None = None

async def __aenter__(self) -> timedelta:
if self._active_timeout is not None or self._activation_timestamp is not None:
raise RuntimeError('A shared timeout context cannot be entered twice at the same time')

self._activation_timestamp = time.monotonic()
self._active_timeout = new_timeout = timeout(self._remaining_timeout.total_seconds())
Comment thread
Pijukatel marked this conversation as resolved.
await new_timeout.__aenter__()
return self._remaining_timeout

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
if self._active_timeout is None or self._activation_timestamp is None:
raise RuntimeError('Logic error')

await self._active_timeout.__aexit__(exc_type, exc_value, exc_traceback)
elapsed = time.monotonic() - self._activation_timestamp
self._remaining_timeout = self._remaining_timeout - timedelta(seconds=elapsed)

self._active_timeout = None
self._activation_timestamp = None


def format_duration(duration: timedelta | None) -> str:
"""Format a timedelta into a human-readable string with appropriate units."""
if duration is None:
Expand Down
3 changes: 2 additions & 1 deletion src/crawlee/crawlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from crawlee._utils.try_import import install_import_hook as _install_import_hook
from crawlee._utils.try_import import try_import as _try_import

from ._abstract_http import AbstractHttpCrawler, AbstractHttpParser, ParsedHttpCrawlingContext
from ._abstract_http import AbstractHttpCrawler, AbstractHttpParser, HttpCrawlerOptions, ParsedHttpCrawlingContext
from ._basic import BasicCrawler, BasicCrawlerOptions, BasicCrawlingContext, ContextPipeline
from ._http import HttpCrawler, HttpCrawlingContext, HttpCrawlingResult

Expand Down Expand Up @@ -51,6 +51,7 @@
'BeautifulSoupParserType',
'ContextPipeline',
'HttpCrawler',
'HttpCrawlerOptions',
'HttpCrawlingContext',
'HttpCrawlingResult',
'ParsedHttpCrawlingContext',
Expand Down
3 changes: 2 additions & 1 deletion src/crawlee/crawlers/_abstract_http/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from ._abstract_http_crawler import AbstractHttpCrawler
from ._abstract_http_crawler import AbstractHttpCrawler, HttpCrawlerOptions
from ._abstract_http_parser import AbstractHttpParser
from ._http_crawling_context import ParsedHttpCrawlingContext

__all__ = [
'AbstractHttpCrawler',
'AbstractHttpParser',
'HttpCrawlerOptions',
'ParsedHttpCrawlingContext',
]
48 changes: 38 additions & 10 deletions src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import asyncio
import logging
from abc import ABC
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Generic

from more_itertools import partition
from pydantic import ValidationError
from typing_extensions import TypeVar
from typing_extensions import NotRequired, TypeVar

from crawlee._request import Request, RequestOptions
from crawlee._utils.docs import docs_group
from crawlee._utils.time import SharedTimeout
from crawlee._utils.urls import to_absolute_url_iterator
from crawlee.crawlers._basic import BasicCrawler, BasicCrawlerOptions, ContextPipeline
from crawlee.errors import SessionError
Expand All @@ -32,6 +34,19 @@
TStatisticsState = TypeVar('TStatisticsState', bound=StatisticsState, default=StatisticsState)


class HttpCrawlerOptions(
BasicCrawlerOptions[TCrawlingContext, TStatisticsState],
Generic[TCrawlingContext, TStatisticsState],
):
"""Arguments for the `AbstractHttpCrawler` constructor.

It is intended for typing forwarded `__init__` arguments in the subclasses.
"""

navigation_timeout: NotRequired[timedelta | None]
"""Timeout for the HTTP request."""


@docs_group('Crawlers')
class AbstractHttpCrawler(
BasicCrawler[TCrawlingContext, StatisticsState],
Expand All @@ -56,10 +71,13 @@ def __init__(
self,
*,
parser: AbstractHttpParser[TParseResult, TSelectResult],
navigation_timeout: timedelta | None = None,
**kwargs: Unpack[BasicCrawlerOptions[TCrawlingContext, StatisticsState]],
) -> None:
self._parser = parser
self._navigation_timeout = navigation_timeout or timedelta(minutes=1)
self._pre_navigation_hooks: list[Callable[[BasicCrawlingContext], Awaitable[None]]] = []
self._shared_navigation_timeouts: dict[int, SharedTimeout] = {}

if '_context_pipeline' not in kwargs:
raise ValueError(
Expand Down Expand Up @@ -112,9 +130,17 @@ def _create_static_content_crawler_pipeline(self) -> ContextPipeline[ParsedHttpC
async def _execute_pre_navigation_hooks(
self, context: BasicCrawlingContext
) -> AsyncGenerator[BasicCrawlingContext, None]:
for hook in self._pre_navigation_hooks:
await hook(context)
yield context
context_id = id(context)
self._shared_navigation_timeouts[context_id] = SharedTimeout(self._navigation_timeout)

try:
for hook in self._pre_navigation_hooks:
async with self._shared_navigation_timeouts[context_id]:
await hook(context)

yield context
finally:
self._shared_navigation_timeouts.pop(context_id, None)

async def _parse_http_response(
self, context: HttpCrawlingContext
Expand Down Expand Up @@ -222,12 +248,14 @@ async def _make_http_request(self, context: BasicCrawlingContext) -> AsyncGenera
Yields:
The original crawling context enhanced by HTTP response.
"""
result = await self._http_client.crawl(
request=context.request,
session=context.session,
proxy_info=context.proxy_info,
statistics=self._statistics,
)
async with self._shared_navigation_timeouts[id(context)] as remaining_timeout:
result = await self._http_client.crawl(
request=context.request,
session=context.session,
proxy_info=context.proxy_info,
statistics=self._statistics,
timeout=remaining_timeout,
)

yield HttpCrawlingContext.from_basic_crawling_context(context=context, http_response=result.http_response)

Expand Down
15 changes: 9 additions & 6 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1507,12 +1507,15 @@ async def __run_task_function(self) -> None:
raise

async def _run_request_handler(self, context: BasicCrawlingContext) -> None:
await wait_for(
Comment thread
janbuchar marked this conversation as resolved.
lambda: self._context_pipeline(context, self.router),
timeout=self._request_handler_timeout,
timeout_message=f'{self._request_handler_timeout_text}'
f' {self._request_handler_timeout.total_seconds()} seconds',
logger=self._logger,
await self._context_pipeline(
context,
lambda final_context: wait_for(
lambda: self.router(final_context),
timeout=self._request_handler_timeout,
timeout_message=f'{self._request_handler_timeout_text}'
f' {self._request_handler_timeout.total_seconds()} seconds',
logger=self._logger,
),
)

def _raise_for_error_status_code(self, status_code: int) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/crawlee/crawlers/_beautifulsoup/_beautifulsoup_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from bs4 import BeautifulSoup, Tag

from crawlee._utils.docs import docs_group
from crawlee.crawlers import AbstractHttpCrawler, BasicCrawlerOptions
from crawlee.crawlers import AbstractHttpCrawler, HttpCrawlerOptions

from ._beautifulsoup_crawling_context import BeautifulSoupCrawlingContext
from ._beautifulsoup_parser import BeautifulSoupParser, BeautifulSoupParserType
Expand Down Expand Up @@ -58,7 +58,7 @@ def __init__(
self,
*,
parser: BeautifulSoupParserType = 'lxml',
**kwargs: Unpack[BasicCrawlerOptions[BeautifulSoupCrawlingContext]],
**kwargs: Unpack[HttpCrawlerOptions[BeautifulSoupCrawlingContext]],
) -> None:
"""Initialize a new instance.

Expand Down
4 changes: 2 additions & 2 deletions src/crawlee/crawlers/_parsel/_parsel_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from parsel import Selector

from crawlee._utils.docs import docs_group
from crawlee.crawlers import AbstractHttpCrawler, BasicCrawlerOptions
from crawlee.crawlers import AbstractHttpCrawler, HttpCrawlerOptions

from ._parsel_crawling_context import ParselCrawlingContext
from ._parsel_parser import ParselParser
Expand Down Expand Up @@ -56,7 +56,7 @@ async def request_handler(context: ParselCrawlingContext) -> None:

def __init__(
self,
**kwargs: Unpack[BasicCrawlerOptions[ParselCrawlingContext]],
**kwargs: Unpack[HttpCrawlerOptions[ParselCrawlingContext]],
) -> None:
"""Initialize a new instance.

Expand Down
41 changes: 34 additions & 7 deletions src/crawlee/crawlers/_playwright/_playwright_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@
import asyncio
import logging
import warnings
from datetime import timedelta
from functools import partial
from typing import TYPE_CHECKING, Any, Generic, Literal

import playwright.async_api
from more_itertools import partition
from pydantic import ValidationError
from typing_extensions import NotRequired, TypedDict, TypeVar

from crawlee import service_locator
from crawlee._request import Request, RequestOptions
from crawlee._types import ConcurrencySettings
from crawlee._types import (
BasicCrawlingContext,
ConcurrencySettings,
)
from crawlee._utils.blocked import RETRY_CSS_SELECTORS
from crawlee._utils.docs import docs_group
from crawlee._utils.robots import RobotsTxtFile
from crawlee._utils.time import SharedTimeout
from crawlee._utils.urls import to_absolute_url_iterator
from crawlee.browsers import BrowserPool
from crawlee.crawlers._basic import BasicCrawler, BasicCrawlerOptions, ContextPipeline
Expand Down Expand Up @@ -44,7 +50,6 @@

from crawlee import RequestTransformAction
from crawlee._types import (
BasicCrawlingContext,
EnqueueLinksKwargs,
ExtractLinksFunction,
HttpHeaders,
Expand Down Expand Up @@ -106,6 +111,7 @@ def __init__(
fingerprint_generator: FingerprintGenerator | None | Literal['default'] = 'default',
headless: bool | None = None,
use_incognito_pages: bool | None = None,
navigation_timeout: timedelta | None = None,
**kwargs: Unpack[BasicCrawlerOptions[PlaywrightCrawlingContext, StatisticsState]],
) -> None:
"""Initialize a new instance.
Expand Down Expand Up @@ -134,12 +140,16 @@ def __init__(
use_incognito_pages: By default pages share the same browser context. If set to True each page uses its
own context that is destroyed once the page is closed or crashes.
This option should not be used if `browser_pool` is provided.
navigation_timeout: Timeout for navigation (the process between opening a Playwright page and calling
Comment thread
janbuchar marked this conversation as resolved.
the request handler)
kwargs: Additional keyword arguments to pass to the underlying `BasicCrawler`.
"""
configuration = kwargs.pop('configuration', None)
if configuration is not None:
service_locator.set_configuration(configuration)

self._shared_navigation_timeouts: dict[int, SharedTimeout] = {}

if browser_pool:
# Raise an exception if browser_pool is provided together with other browser-related arguments.
if any(
Expand Down Expand Up @@ -202,6 +212,8 @@ def __init__(
if 'concurrency_settings' not in kwargs or kwargs['concurrency_settings'] is None:
kwargs['concurrency_settings'] = ConcurrencySettings(desired_concurrency=1)

self._navigation_timeout = navigation_timeout or timedelta(minutes=1)

super().__init__(**kwargs)

async def _open_page(
Expand All @@ -228,10 +240,18 @@ async def _open_page(
block_requests=partial(block_requests, page=crawlee_page.page),
)

async with browser_page_context(crawlee_page.page):
for hook in self._pre_navigation_hooks:
await hook(pre_navigation_context)
yield pre_navigation_context
context_id = id(pre_navigation_context)
self._shared_navigation_timeouts[context_id] = SharedTimeout(self._navigation_timeout)

try:
async with browser_page_context(crawlee_page.page):
for hook in self._pre_navigation_hooks:
async with self._shared_navigation_timeouts[context_id]:
await hook(pre_navigation_context)

yield pre_navigation_context
finally:
self._shared_navigation_timeouts.pop(context_id, None)

def _prepare_request_interceptor(
self,
Expand Down Expand Up @@ -266,6 +286,7 @@ async def _navigate(
Raises:
ValueError: If the browser pool is not initialized.
SessionError: If the URL cannot be loaded by the browser.
TimeoutError: If navigation does not succeed within the navigation timeout.

Yields:
The enhanced crawling context with the Playwright-specific features (page, response, enqueue_links,
Expand Down Expand Up @@ -297,7 +318,13 @@ async def _navigate(
# Set route_handler only for current request
await context.page.route(context.request.url, route_handler)

response = await context.page.goto(context.request.url)
try:
async with self._shared_navigation_timeouts[id(context)] as remaining_timeout:
response = await context.page.goto(
context.request.url, timeout=remaining_timeout.total_seconds() * 1000
)
except playwright.async_api.TimeoutError as exc:
raise asyncio.TimeoutError from exc

if response is None:
raise SessionError(f'Failed to load the URL: {context.request.url}')
Expand Down
8 changes: 7 additions & 1 deletion src/crawlee/crawlers/_playwright/_playwright_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async def crawl(
session: Session | None = None,
proxy_info: ProxyInfo | None = None,
statistics: Statistics | None = None,
timeout: timedelta | None = None,
) -> HttpCrawlingResult:
raise NotImplementedError('The `crawl` method should not be used for `PlaywrightHttpClient`')

Expand All @@ -72,6 +73,7 @@ async def send_request(
payload: HttpPayload | None = None,
session: Session | None = None,
proxy_info: ProxyInfo | None = None,
timeout: timedelta | None = None,
) -> HttpResponse:
# `proxy_info` are not used because `APIRequestContext` inherits the proxy from `BrowserContext`
# TODO: Use `session` to restore all the fingerprint headers according to the `BrowserContext`, after resolved
Expand All @@ -87,7 +89,11 @@ async def send_request(

# Proxies appropriate to the browser context are used
response = await browser_context.request.fetch(
url_or_request=url, method=method.lower(), headers=dict(headers) if headers else None, data=payload
url_or_request=url,
method=method.lower(),
headers=dict(headers) if headers else None,
data=payload,
timeout=timeout.total_seconds() if timeout else None,
)

return await PlaywrightHttpResponse.from_playwright_response(response, protocol='')
Expand Down
Loading