diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1d4365d..9d7e50f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,9 @@ Unreleased Added ##### +- Add ``AsyncAnticaptchaClient`` and ``AsyncJob`` for async/await usage with ``httpx`` (``pip install python-anticaptcha[async]``) +- Rename ``base.py`` → ``sync_client.py`` for symmetry with ``async_client.py``; backward-compatible ``base.py`` shim preserved +- Rename sync example files with ``sync_`` prefix to match ``async_`` examples - Add context manager support to ``AnticaptchaClient`` (``__enter__``, ``__exit__``, ``close``) - Add ``ANTICAPTCHA_API_KEY`` environment variable fallback for ``AnticaptchaClient`` - Add ``Proxy`` frozen dataclass with ``parse_url()`` and ``to_kwargs()`` methods diff --git a/README.md b/README.md index b91cb75..e1efe2d 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,12 @@ Install as standard Python package using: pip install python-anticaptcha ``` +For async support (FastAPI, aiohttp, Starlette, etc.): + +``` +pip install python-anticaptcha[async] +``` + ## Usage To use this library you need [Anticaptcha.com](http://getcaptchasolution.com/p9bwplkicx) API key. @@ -42,6 +48,29 @@ with AnticaptchaClient(api_key) as client: job.join() ``` +## Async Usage + +For async frameworks, use `AsyncAnticaptchaClient` — the API mirrors the sync +client but all methods are awaitable: + +```python +from python_anticaptcha import AsyncAnticaptchaClient, NoCaptchaTaskProxylessTask + +api_key = '174faff8fbc769e94a5862391ecfd010' +site_key = '6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-' +url = 'https://www.google.com/recaptcha/api2/demo' + +async with AsyncAnticaptchaClient(api_key) as client: + task = NoCaptchaTaskProxylessTask(url, site_key) + job = await client.create_task(task) + await job.join() + print(job.get_solution_response()) +``` + +The full integration example is available in file `examples/async_recaptcha_request.py`. + +## Sync Usage + ### Solve recaptcha Example snippet for Recaptcha: @@ -60,7 +89,7 @@ job.join() print(job.get_solution_response()) ``` -The full integration example is available in file `examples/recaptcha_request.py`. +The full integration example is available in file `examples/sync_recaptcha_request.py`. If you process the same page many times, to increase reliability you can specify whether the captcha is visible or not. This parameter is not required, as the diff --git a/docs/api.rst b/docs/api.rst index bee2fd0..02f80e9 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,10 +1,17 @@ API === -Base ----- +Sync Client +----------- - .. automodule:: python_anticaptcha.base + .. automodule:: python_anticaptcha.sync_client + :members: + :undoc-members: + +Async Client +------------ + + .. automodule:: python_anticaptcha.async_client :members: :undoc-members: diff --git a/docs/usage.rst b/docs/usage.rst index 633b1f1..ecc3edb 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -21,6 +21,35 @@ The client can be used as a context manager to ensure the underlying session is job = client.create_task(task) job.join() +Async client +############ + +.. note:: + + Requires the ``async`` extra: ``pip install python-anticaptcha[async]`` + +For async frameworks (FastAPI, aiohttp, Starlette) use ``AsyncAnticaptchaClient`` — +the API mirrors the sync client but all methods are awaitable: + +.. code:: python + + from python_anticaptcha import AsyncAnticaptchaClient, NoCaptchaTaskProxylessTask + + api_key = '174faff8fbc769e94a5862391ecfd010' + site_key = '6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-' + url = 'https://www.google.com/recaptcha/api2/demo' + + async with AsyncAnticaptchaClient(api_key) as client: + task = NoCaptchaTaskProxylessTask(url, site_key) + job = await client.create_task(task) + await job.join() + print(job.get_solution_response()) + +The full integration example is available in file ``examples/async_recaptcha_request.py``. + +Sync client +########### + Solve recaptcha ############### @@ -40,7 +69,7 @@ Example snippet for Recaptcha: job.join() print(job.get_solution_response()) -The full integration example is available in file ``examples/recaptcha_request.py``. +The full integration example is available in file ``examples/sync_recaptcha_request.py``. If you process the same page many times, to increase reliability you can specify whether the captcha is visible or not. This parameter is not required, as the diff --git a/examples/async_balance.py b/examples/async_balance.py new file mode 100644 index 0000000..49d1d85 --- /dev/null +++ b/examples/async_balance.py @@ -0,0 +1,17 @@ +import asyncio +from os import environ +from pprint import pprint + +from python_anticaptcha import AsyncAnticaptchaClient + +api_key = environ["KEY"] + + +async def process(): + async with AsyncAnticaptchaClient(api_key) as client: + balance = await client.get_balance() + pprint(balance) + + +if __name__ == "__main__": + asyncio.run(process()) diff --git a/examples/async_recaptcha_request.py b/examples/async_recaptcha_request.py new file mode 100644 index 0000000..b3dcaa1 --- /dev/null +++ b/examples/async_recaptcha_request.py @@ -0,0 +1,40 @@ +import asyncio +import re +from os import environ + +import httpx + +from python_anticaptcha import AsyncAnticaptchaClient, NoCaptchaTaskProxylessTask + +api_key = environ["KEY"] +site_key_pattern = 'data-sitekey="(.+?)"' +url = "https://www.google.com/recaptcha/api2/demo?invisible=false" +EXPECTED_RESULT = "Verification Success... Hooray!" + + +async def get_form_html(session: httpx.AsyncClient) -> str: + return (await session.get(url)).text + + +async def get_token(client: AsyncAnticaptchaClient, form_html: str) -> str: + site_key = re.search(site_key_pattern, form_html).group(1) + task = NoCaptchaTaskProxylessTask(website_url=url, website_key=site_key) + job = await client.create_task(task) + await job.join() + return job.get_solution_response() + + +async def form_submit(session: httpx.AsyncClient, token: str) -> str: + return (await session.post(url, data={"g-recaptcha-response": token})).text + + +async def process(): + async with AsyncAnticaptchaClient(api_key) as client, httpx.AsyncClient() as session: + html = await get_form_html(session) + token = await get_token(client, html) + return await form_submit(session, token) + + +if __name__ == "__main__": + result = asyncio.run(process()) + assert "Verification Success... Hooray!" in result diff --git a/examples/antigate.py b/examples/sync_antigate.py similarity index 100% rename from examples/antigate.py rename to examples/sync_antigate.py diff --git a/examples/app_stat.py b/examples/sync_app_stat.py similarity index 100% rename from examples/app_stat.py rename to examples/sync_app_stat.py diff --git a/examples/balance.py b/examples/sync_balance.py similarity index 100% rename from examples/balance.py rename to examples/sync_balance.py diff --git a/examples/funcaptcha_request.py b/examples/sync_funcaptcha_request.py similarity index 100% rename from examples/funcaptcha_request.py rename to examples/sync_funcaptcha_request.py diff --git a/examples/funcaptcha_selenium.py b/examples/sync_funcaptcha_selenium.py similarity index 100% rename from examples/funcaptcha_selenium.py rename to examples/sync_funcaptcha_selenium.py diff --git a/examples/funcaptcha_selenium_callback.py b/examples/sync_funcaptcha_selenium_callback.py similarity index 100% rename from examples/funcaptcha_selenium_callback.py rename to examples/sync_funcaptcha_selenium_callback.py diff --git a/examples/hcaptcha_request.py b/examples/sync_hcaptcha_request.py similarity index 100% rename from examples/hcaptcha_request.py rename to examples/sync_hcaptcha_request.py diff --git a/examples/hcaptcha_request_proxy.py b/examples/sync_hcaptcha_request_proxy.py similarity index 100% rename from examples/hcaptcha_request_proxy.py rename to examples/sync_hcaptcha_request_proxy.py diff --git a/examples/recaptcha3_request.py b/examples/sync_recaptcha3_request.py similarity index 100% rename from examples/recaptcha3_request.py rename to examples/sync_recaptcha3_request.py diff --git a/examples/recaptcha_request.py b/examples/sync_recaptcha_request.py similarity index 100% rename from examples/recaptcha_request.py rename to examples/sync_recaptcha_request.py diff --git a/examples/recaptcha_selenium.py b/examples/sync_recaptcha_selenium.py similarity index 100% rename from examples/recaptcha_selenium.py rename to examples/sync_recaptcha_selenium.py diff --git a/examples/recaptcha_selenium_callback.py b/examples/sync_recaptcha_selenium_callback.py similarity index 100% rename from examples/recaptcha_selenium_callback.py rename to examples/sync_recaptcha_selenium_callback.py diff --git a/examples/remote_image.py b/examples/sync_remote_image.py similarity index 100% rename from examples/remote_image.py rename to examples/sync_remote_image.py diff --git a/examples/text.py b/examples/sync_text.py similarity index 100% rename from examples/text.py rename to examples/sync_text.py diff --git a/examples/text_stream.py b/examples/sync_text_stream.py similarity index 100% rename from examples/text_stream.py rename to examples/sync_text_stream.py diff --git a/pyproject.toml b/pyproject.toml index 82ec05b..6981df6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ license = "MIT" requires-python = ">=3.9" dependencies = ["requests"] dynamic = ["version"] -keywords = ["recaptcha", "captcha", "development"] +keywords = ["recaptcha", "captcha", "development", "asyncio", "async", "httpx"] classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", @@ -21,6 +21,8 @@ classifiers = [ "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.14", + "Framework :: AsyncIO", + "Topic :: Internet :: WWW/HTTP", ] [project.urls] @@ -28,7 +30,7 @@ Homepage = "https://github.com/ad-m/python-anticaptcha" [project.optional-dependencies] async = ["httpx>=0.24"] -tests = ["pytest", "retry", "selenium"] +tests = ["pytest", "pytest-asyncio", "httpx>=0.24", "retry", "selenium", "six"] docs = ["sphinx", "sphinx-rtd-theme"] [tool.setuptools.package-data] diff --git a/python_anticaptcha/__init__.py b/python_anticaptcha/__init__.py index 76044a9..dfa2bfd 100644 --- a/python_anticaptcha/__init__.py +++ b/python_anticaptcha/__init__.py @@ -1,9 +1,9 @@ import contextlib from importlib.metadata import PackageNotFoundError, version -from .base import AnticaptchaClient, Job from .exceptions import AnticaptchaException from .proxy import Proxy +from .sync_client import AnticaptchaClient, Job from .tasks import ( AntiGateTask, AntiGateTaskProxyless, @@ -28,6 +28,17 @@ with contextlib.suppress(PackageNotFoundError): __version__ = version(__name__) + +def __getattr__(name: str) -> type: + if name in ("AsyncAnticaptchaClient", "AsyncJob"): + from .async_client import AsyncAnticaptchaClient, AsyncJob + + globals()["AsyncAnticaptchaClient"] = AsyncAnticaptchaClient + globals()["AsyncJob"] = AsyncJob + return globals()[name] + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + __all__ = [ "AnticaptchaClient", "Job", @@ -50,4 +61,6 @@ "AntiGateTask", "AnticaptchaException", "AnticatpchaException", + "AsyncAnticaptchaClient", + "AsyncJob", ] diff --git a/python_anticaptcha/async_client.py b/python_anticaptcha/async_client.py new file mode 100644 index 0000000..c496573 --- /dev/null +++ b/python_anticaptcha/async_client.py @@ -0,0 +1,237 @@ +from __future__ import annotations + +import asyncio +import os +from types import TracebackType +from typing import Any, Callable, Literal +from urllib.parse import urljoin + +try: + import httpx # type: ignore[import-not-found] +except ImportError: + httpx = None # type: ignore[assignment] + +from .exceptions import AnticaptchaException +from .tasks import BaseTask + +SLEEP_EVERY_CHECK_FINISHED = 3 +MAXIMUM_JOIN_TIME = 60 * 5 + + +class AsyncJob: + client: AsyncAnticaptchaClient + task_id: int + _last_result: dict[str, Any] | None = None + + def __init__(self, client: AsyncAnticaptchaClient, task_id: int) -> None: + self.client = client + self.task_id = task_id + + async def _update(self) -> None: + self._last_result = await self.client.getTaskResult(self.task_id) + + async def check_is_ready(self) -> bool: + await self._update() + assert self._last_result is not None + return self._last_result["status"] == "ready" + + def get_solution_response(self) -> str: # Recaptcha + assert self._last_result is not None + return self._last_result["solution"]["gRecaptchaResponse"] + + def get_solution(self) -> dict[str, Any]: + assert self._last_result is not None + return self._last_result["solution"] + + def get_token_response(self) -> str: # Funcaptcha + assert self._last_result is not None + return self._last_result["solution"]["token"] + + def get_answers(self) -> dict[str, str]: + assert self._last_result is not None + return self._last_result["solution"]["answers"] + + def get_captcha_text(self) -> str: # Image + assert self._last_result is not None + return self._last_result["solution"]["text"] + + def get_cells_numbers(self) -> list[int]: + assert self._last_result is not None + return self._last_result["solution"]["cellNumbers"] + + async def report_incorrect_image(self) -> bool: + return await self.client.reportIncorrectImage(self.task_id) + + async def report_incorrect_recaptcha(self) -> bool: + return await self.client.reportIncorrectRecaptcha(self.task_id) + + def __repr__(self) -> str: + status = self._last_result.get("status") if self._last_result else None + if status: + return f"" + return f"" + + async def join( + self, + maximum_time: int | None = None, + on_check: Callable[[int, str | None], None] | None = None, + backoff: bool = False, + ) -> None: + """Poll for task completion, sleeping asynchronously until ready or timeout. + + :param maximum_time: Maximum seconds to wait (default: ``MAXIMUM_JOIN_TIME``). + :param on_check: Optional callback invoked after each poll with + ``(elapsed_time, status)`` where *elapsed_time* is the total seconds + waited so far and *status* is the last task status string + (e.g. ``"processing"``). + :param backoff: When ``True``, use exponential backoff for polling + intervals starting at 1 second and doubling up to a 10-second cap. + Default ``False`` preserves the fixed 3-second interval. + :raises AnticaptchaException: If *maximum_time* is exceeded. + """ + elapsed_time = 0 + maximum_time = maximum_time or MAXIMUM_JOIN_TIME + sleep_time = 1 if backoff else SLEEP_EVERY_CHECK_FINISHED + while not await self.check_is_ready(): + await asyncio.sleep(sleep_time) + elapsed_time += sleep_time + if backoff: + sleep_time = min(sleep_time * 2, 10) + if on_check is not None and self._last_result is not None: + on_check(elapsed_time, self._last_result.get("status")) + if elapsed_time > maximum_time: + raise AnticaptchaException( + None, + 250, + f"The execution time exceeded a maximum time of {maximum_time} seconds." + f" It takes {elapsed_time} seconds.", + ) + + +class AsyncAnticaptchaClient: + client_key = None + CREATE_TASK_URL = "/createTask" + TASK_RESULT_URL = "/getTaskResult" + BALANCE_URL = "/getBalance" + REPORT_IMAGE_URL = "/reportIncorrectImageCaptcha" + REPORT_RECAPTCHA_URL = "/reportIncorrectRecaptcha" + APP_STAT_URL = "/getAppStats" + SOFT_ID = 847 + language_pool = "en" + response_timeout = 5 + + def __init__( + self, + client_key: str | None = None, + language_pool: str = "en", + host: str = "api.anti-captcha.com", + use_ssl: bool = True, + ) -> None: + if httpx is None: + raise ImportError( + "httpx is required for async support. Install it with: pip install python-anticaptcha[async]" + ) + self.client_key = client_key or os.environ.get("ANTICAPTCHA_API_KEY") + if not self.client_key: + raise AnticaptchaException( + None, + "CONFIG_ERROR", + "API key required. Pass client_key or set ANTICAPTCHA_API_KEY env var.", + ) + self.language_pool = language_pool + self.base_url = "{proto}://{host}/".format(proto="https" if use_ssl else "http", host=host) + self.session = httpx.AsyncClient() + + async def __aenter__(self) -> AsyncAnticaptchaClient: + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> Literal[False]: + await self.session.aclose() + return False + + async def close(self) -> None: + await self.session.aclose() + + def __repr__(self) -> str: + from urllib.parse import urlparse + + host = urlparse(self.base_url).hostname or self.base_url + return f"" + + async def _get_client_ip(self) -> str: + if not hasattr(self, "_client_ip"): + response = await self.session.get("https://api.myip.com", timeout=self.response_timeout) + self._client_ip = response.json()["ip"] + return self._client_ip + + async def _check_response(self, response: dict[str, Any]) -> None: + if response.get("errorId", False) == 11: + ip = await self._get_client_ip() + response["errorDescription"] = "{} Your missing IP address is probably {}.".format( + response["errorDescription"], ip + ) + if response.get("errorId", False): + raise AnticaptchaException(response["errorId"], response["errorCode"], response["errorDescription"]) + + async def createTask(self, task: BaseTask) -> AsyncJob: + request = { + "clientKey": self.client_key, + "task": task.serialize(), + "softId": self.SOFT_ID, + "languagePool": self.language_pool, + } + response = ( + await self.session.post( + urljoin(self.base_url, self.CREATE_TASK_URL), + json=request, + timeout=self.response_timeout, + ) + ).json() + await self._check_response(response) + return AsyncJob(self, response["taskId"]) + + async def getTaskResult(self, task_id: int) -> dict[str, Any]: + request = {"clientKey": self.client_key, "taskId": task_id} + response = (await self.session.post(urljoin(self.base_url, self.TASK_RESULT_URL), json=request)).json() + await self._check_response(response) + return response + + async def getBalance(self) -> float: + request = { + "clientKey": self.client_key, + "softId": self.SOFT_ID, + } + response = (await self.session.post(urljoin(self.base_url, self.BALANCE_URL), json=request)).json() + await self._check_response(response) + return response["balance"] + + async def getAppStats(self, soft_id: int, mode: str) -> dict[str, Any]: + request = {"clientKey": self.client_key, "softId": soft_id, "mode": mode} + response = (await self.session.post(urljoin(self.base_url, self.APP_STAT_URL), json=request)).json() + await self._check_response(response) + return response + + async def reportIncorrectImage(self, task_id: int) -> bool: + request = {"clientKey": self.client_key, "taskId": task_id} + response = (await self.session.post(urljoin(self.base_url, self.REPORT_IMAGE_URL), json=request)).json() + await self._check_response(response) + return bool(response.get("status", False)) + + async def reportIncorrectRecaptcha(self, task_id: int) -> bool: + request = {"clientKey": self.client_key, "taskId": task_id} + response = (await self.session.post(urljoin(self.base_url, self.REPORT_RECAPTCHA_URL), json=request)).json() + await self._check_response(response) + return response["status"] == "success" + + # Snake_case aliases + create_task = createTask + get_task_result = getTaskResult + get_balance = getBalance + get_app_stats = getAppStats + report_incorrect_image = reportIncorrectImage + report_incorrect_recaptcha = reportIncorrectRecaptcha diff --git a/python_anticaptcha/base.py b/python_anticaptcha/base.py index 461e66a..e7d7979 100644 --- a/python_anticaptcha/base.py +++ b/python_anticaptcha/base.py @@ -1,277 +1,2 @@ -from __future__ import annotations - -import json -import os -import time -import warnings -from types import TracebackType -from typing import Any, Callable, Literal -from urllib.parse import urljoin - -import requests - -from .exceptions import AnticaptchaException -from .tasks import BaseTask - -SLEEP_EVERY_CHECK_FINISHED = 3 -MAXIMUM_JOIN_TIME = 60 * 5 - - -class Job: - client: AnticaptchaClient - task_id: int - _last_result: dict[str, Any] | None = None - - def __init__(self, client: AnticaptchaClient, task_id: int) -> None: - self.client = client - self.task_id = task_id - - def _update(self) -> None: - self._last_result = self.client.getTaskResult(self.task_id) - - def check_is_ready(self) -> bool: - self._update() - assert self._last_result is not None - return self._last_result["status"] == "ready" - - def get_solution_response(self) -> str: # Recaptcha - assert self._last_result is not None - return self._last_result["solution"]["gRecaptchaResponse"] - - def get_solution(self) -> dict[str, Any]: - assert self._last_result is not None - return self._last_result["solution"] - - def get_token_response(self) -> str: # Funcaptcha - assert self._last_result is not None - return self._last_result["solution"]["token"] - - def get_answers(self) -> dict[str, str]: - assert self._last_result is not None - return self._last_result["solution"]["answers"] - - def get_captcha_text(self) -> str: # Image - assert self._last_result is not None - return self._last_result["solution"]["text"] - - def get_cells_numbers(self) -> list[int]: - assert self._last_result is not None - return self._last_result["solution"]["cellNumbers"] - - def report_incorrect(self) -> bool: - warnings.warn( - "report_incorrect is deprecated, use report_incorrect_image instead", - DeprecationWarning, - stacklevel=2, - ) - return self.client.reportIncorrectImage(self.task_id) - - def report_incorrect_image(self) -> bool: - return self.client.reportIncorrectImage(self.task_id) - - def report_incorrect_recaptcha(self) -> bool: - return self.client.reportIncorrectRecaptcha(self.task_id) - - def __repr__(self) -> str: - status = self._last_result.get("status") if self._last_result else None - if status: - return f"" - return f"" - - def join( - self, - maximum_time: int | None = None, - on_check: Callable[[int, str | None], None] | None = None, - backoff: bool = False, - ) -> None: - """Poll for task completion, blocking until ready or timeout. - - :param maximum_time: Maximum seconds to wait (default: ``MAXIMUM_JOIN_TIME``). - :param on_check: Optional callback invoked after each poll with - ``(elapsed_time, status)`` where *elapsed_time* is the total seconds - waited so far and *status* is the last task status string - (e.g. ``"processing"``). - :param backoff: When ``True``, use exponential backoff for polling - intervals starting at 1 second and doubling up to a 10-second cap. - Default ``False`` preserves the fixed 3-second interval. - :raises AnticaptchaException: If *maximum_time* is exceeded. - """ - elapsed_time = 0 - maximum_time = maximum_time or MAXIMUM_JOIN_TIME - sleep_time = 1 if backoff else SLEEP_EVERY_CHECK_FINISHED - while not self.check_is_ready(): - time.sleep(sleep_time) - elapsed_time += sleep_time - if backoff: - sleep_time = min(sleep_time * 2, 10) - if on_check is not None and self._last_result is not None: - on_check(elapsed_time, self._last_result.get("status")) - if elapsed_time > maximum_time: - raise AnticaptchaException( - None, - 250, - f"The execution time exceeded a maximum time of {maximum_time} seconds." - f" It takes {elapsed_time} seconds.", - ) - - -class AnticaptchaClient: - client_key = None - CREATE_TASK_URL = "/createTask" - TASK_RESULT_URL = "/getTaskResult" - BALANCE_URL = "/getBalance" - REPORT_IMAGE_URL = "/reportIncorrectImageCaptcha" - REPORT_RECAPTCHA_URL = "/reportIncorrectRecaptcha" - APP_STAT_URL = "/getAppStats" - SOFT_ID = 847 - language_pool = "en" - response_timeout = 5 - - def __init__( - self, - client_key: str | None = None, - language_pool: str = "en", - host: str = "api.anti-captcha.com", - use_ssl: bool = True, - ) -> None: - self.client_key = client_key or os.environ.get("ANTICAPTCHA_API_KEY") - if not self.client_key: - raise AnticaptchaException( - None, - "CONFIG_ERROR", - "API key required. Pass client_key or set ANTICAPTCHA_API_KEY env var.", - ) - self.language_pool = language_pool - self.base_url = "{proto}://{host}/".format(proto="https" if use_ssl else "http", host=host) - self.session = requests.Session() - - def __enter__(self) -> AnticaptchaClient: - return self - - def __exit__( - self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, - ) -> Literal[False]: - self.session.close() - return False - - def close(self) -> None: - self.session.close() - - def __repr__(self) -> str: - from urllib.parse import urlparse - - host = urlparse(self.base_url).hostname or self.base_url - return f"" - - @property - def client_ip(self) -> str: - if not hasattr(self, "_client_ip"): - self._client_ip = self.session.get("https://api.myip.com", timeout=self.response_timeout).json()["ip"] - return self._client_ip - - def _check_response(self, response: dict[str, Any]) -> None: - if response.get("errorId", False) == 11: - response["errorDescription"] = "{} Your missing IP address is probably {}.".format( - response["errorDescription"], self.client_ip - ) - if response.get("errorId", False): - raise AnticaptchaException(response["errorId"], response["errorCode"], response["errorDescription"]) - - def createTask(self, task: BaseTask) -> Job: - request = { - "clientKey": self.client_key, - "task": task.serialize(), - "softId": self.SOFT_ID, - "languagePool": self.language_pool, - } - response = self.session.post( - urljoin(self.base_url, self.CREATE_TASK_URL), - json=request, - timeout=self.response_timeout, - ).json() - self._check_response(response) - return Job(self, response["taskId"]) - - def createTaskSmee(self, task: BaseTask, timeout: int = MAXIMUM_JOIN_TIME) -> Job: - """ - Beta method to stream response from smee.io - """ - response = self.session.head("https://smee.io/new", timeout=self.response_timeout) - smee_url = response.headers["Location"] - task_data = task.serialize() - request = { - "clientKey": self.client_key, - "task": task_data, - "softId": self.SOFT_ID, - "languagePool": self.language_pool, - "callbackUrl": smee_url, - } - r = self.session.get( - url=smee_url, - headers={"Accept": "text/event-stream"}, - stream=True, - timeout=(self.response_timeout, timeout), - ) - create_response = self.session.post( - url=urljoin(self.base_url, self.CREATE_TASK_URL), - json=request, - timeout=self.response_timeout, - ).json() - self._check_response(create_response) - for line in r.iter_lines(): - content = line.decode("utf-8") - if '"host":"smee.io"' not in content: - continue - payload = json.loads(content.split(":", maxsplit=1)[1].strip()) - if "taskId" not in payload["body"] or str(payload["body"]["taskId"]) != str(create_response["taskId"]): - continue - r.close() - job = Job(client=self, task_id=create_response["taskId"]) - job._last_result = payload["body"] - return job - raise AnticaptchaException(None, 250, "No matching task response received from smee.io") - - def getTaskResult(self, task_id: int) -> dict[str, Any]: - request = {"clientKey": self.client_key, "taskId": task_id} - response = self.session.post(urljoin(self.base_url, self.TASK_RESULT_URL), json=request).json() - self._check_response(response) - return response - - def getBalance(self) -> float: - request = { - "clientKey": self.client_key, - "softId": self.SOFT_ID, - } - response = self.session.post(urljoin(self.base_url, self.BALANCE_URL), json=request).json() - self._check_response(response) - return response["balance"] - - def getAppStats(self, soft_id: int, mode: str) -> dict[str, Any]: - request = {"clientKey": self.client_key, "softId": soft_id, "mode": mode} - response = self.session.post(urljoin(self.base_url, self.APP_STAT_URL), json=request).json() - self._check_response(response) - return response - - def reportIncorrectImage(self, task_id: int) -> bool: - request = {"clientKey": self.client_key, "taskId": task_id} - response = self.session.post(urljoin(self.base_url, self.REPORT_IMAGE_URL), json=request).json() - self._check_response(response) - return bool(response.get("status", False)) - - def reportIncorrectRecaptcha(self, task_id: int) -> bool: - request = {"clientKey": self.client_key, "taskId": task_id} - response = self.session.post(urljoin(self.base_url, self.REPORT_RECAPTCHA_URL), json=request).json() - self._check_response(response) - return response["status"] == "success" - - # Snake_case aliases - create_task = createTask - create_task_smee = createTaskSmee - get_task_result = getTaskResult - get_balance = getBalance - get_app_stats = getAppStats - report_incorrect_image = reportIncorrectImage - report_incorrect_recaptcha = reportIncorrectRecaptcha +# Backward compatibility — canonical location is sync_client.py +from .sync_client import * # noqa: F401,F403 diff --git a/python_anticaptcha/sync_client.py b/python_anticaptcha/sync_client.py new file mode 100644 index 0000000..461e66a --- /dev/null +++ b/python_anticaptcha/sync_client.py @@ -0,0 +1,277 @@ +from __future__ import annotations + +import json +import os +import time +import warnings +from types import TracebackType +from typing import Any, Callable, Literal +from urllib.parse import urljoin + +import requests + +from .exceptions import AnticaptchaException +from .tasks import BaseTask + +SLEEP_EVERY_CHECK_FINISHED = 3 +MAXIMUM_JOIN_TIME = 60 * 5 + + +class Job: + client: AnticaptchaClient + task_id: int + _last_result: dict[str, Any] | None = None + + def __init__(self, client: AnticaptchaClient, task_id: int) -> None: + self.client = client + self.task_id = task_id + + def _update(self) -> None: + self._last_result = self.client.getTaskResult(self.task_id) + + def check_is_ready(self) -> bool: + self._update() + assert self._last_result is not None + return self._last_result["status"] == "ready" + + def get_solution_response(self) -> str: # Recaptcha + assert self._last_result is not None + return self._last_result["solution"]["gRecaptchaResponse"] + + def get_solution(self) -> dict[str, Any]: + assert self._last_result is not None + return self._last_result["solution"] + + def get_token_response(self) -> str: # Funcaptcha + assert self._last_result is not None + return self._last_result["solution"]["token"] + + def get_answers(self) -> dict[str, str]: + assert self._last_result is not None + return self._last_result["solution"]["answers"] + + def get_captcha_text(self) -> str: # Image + assert self._last_result is not None + return self._last_result["solution"]["text"] + + def get_cells_numbers(self) -> list[int]: + assert self._last_result is not None + return self._last_result["solution"]["cellNumbers"] + + def report_incorrect(self) -> bool: + warnings.warn( + "report_incorrect is deprecated, use report_incorrect_image instead", + DeprecationWarning, + stacklevel=2, + ) + return self.client.reportIncorrectImage(self.task_id) + + def report_incorrect_image(self) -> bool: + return self.client.reportIncorrectImage(self.task_id) + + def report_incorrect_recaptcha(self) -> bool: + return self.client.reportIncorrectRecaptcha(self.task_id) + + def __repr__(self) -> str: + status = self._last_result.get("status") if self._last_result else None + if status: + return f"" + return f"" + + def join( + self, + maximum_time: int | None = None, + on_check: Callable[[int, str | None], None] | None = None, + backoff: bool = False, + ) -> None: + """Poll for task completion, blocking until ready or timeout. + + :param maximum_time: Maximum seconds to wait (default: ``MAXIMUM_JOIN_TIME``). + :param on_check: Optional callback invoked after each poll with + ``(elapsed_time, status)`` where *elapsed_time* is the total seconds + waited so far and *status* is the last task status string + (e.g. ``"processing"``). + :param backoff: When ``True``, use exponential backoff for polling + intervals starting at 1 second and doubling up to a 10-second cap. + Default ``False`` preserves the fixed 3-second interval. + :raises AnticaptchaException: If *maximum_time* is exceeded. + """ + elapsed_time = 0 + maximum_time = maximum_time or MAXIMUM_JOIN_TIME + sleep_time = 1 if backoff else SLEEP_EVERY_CHECK_FINISHED + while not self.check_is_ready(): + time.sleep(sleep_time) + elapsed_time += sleep_time + if backoff: + sleep_time = min(sleep_time * 2, 10) + if on_check is not None and self._last_result is not None: + on_check(elapsed_time, self._last_result.get("status")) + if elapsed_time > maximum_time: + raise AnticaptchaException( + None, + 250, + f"The execution time exceeded a maximum time of {maximum_time} seconds." + f" It takes {elapsed_time} seconds.", + ) + + +class AnticaptchaClient: + client_key = None + CREATE_TASK_URL = "/createTask" + TASK_RESULT_URL = "/getTaskResult" + BALANCE_URL = "/getBalance" + REPORT_IMAGE_URL = "/reportIncorrectImageCaptcha" + REPORT_RECAPTCHA_URL = "/reportIncorrectRecaptcha" + APP_STAT_URL = "/getAppStats" + SOFT_ID = 847 + language_pool = "en" + response_timeout = 5 + + def __init__( + self, + client_key: str | None = None, + language_pool: str = "en", + host: str = "api.anti-captcha.com", + use_ssl: bool = True, + ) -> None: + self.client_key = client_key or os.environ.get("ANTICAPTCHA_API_KEY") + if not self.client_key: + raise AnticaptchaException( + None, + "CONFIG_ERROR", + "API key required. Pass client_key or set ANTICAPTCHA_API_KEY env var.", + ) + self.language_pool = language_pool + self.base_url = "{proto}://{host}/".format(proto="https" if use_ssl else "http", host=host) + self.session = requests.Session() + + def __enter__(self) -> AnticaptchaClient: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> Literal[False]: + self.session.close() + return False + + def close(self) -> None: + self.session.close() + + def __repr__(self) -> str: + from urllib.parse import urlparse + + host = urlparse(self.base_url).hostname or self.base_url + return f"" + + @property + def client_ip(self) -> str: + if not hasattr(self, "_client_ip"): + self._client_ip = self.session.get("https://api.myip.com", timeout=self.response_timeout).json()["ip"] + return self._client_ip + + def _check_response(self, response: dict[str, Any]) -> None: + if response.get("errorId", False) == 11: + response["errorDescription"] = "{} Your missing IP address is probably {}.".format( + response["errorDescription"], self.client_ip + ) + if response.get("errorId", False): + raise AnticaptchaException(response["errorId"], response["errorCode"], response["errorDescription"]) + + def createTask(self, task: BaseTask) -> Job: + request = { + "clientKey": self.client_key, + "task": task.serialize(), + "softId": self.SOFT_ID, + "languagePool": self.language_pool, + } + response = self.session.post( + urljoin(self.base_url, self.CREATE_TASK_URL), + json=request, + timeout=self.response_timeout, + ).json() + self._check_response(response) + return Job(self, response["taskId"]) + + def createTaskSmee(self, task: BaseTask, timeout: int = MAXIMUM_JOIN_TIME) -> Job: + """ + Beta method to stream response from smee.io + """ + response = self.session.head("https://smee.io/new", timeout=self.response_timeout) + smee_url = response.headers["Location"] + task_data = task.serialize() + request = { + "clientKey": self.client_key, + "task": task_data, + "softId": self.SOFT_ID, + "languagePool": self.language_pool, + "callbackUrl": smee_url, + } + r = self.session.get( + url=smee_url, + headers={"Accept": "text/event-stream"}, + stream=True, + timeout=(self.response_timeout, timeout), + ) + create_response = self.session.post( + url=urljoin(self.base_url, self.CREATE_TASK_URL), + json=request, + timeout=self.response_timeout, + ).json() + self._check_response(create_response) + for line in r.iter_lines(): + content = line.decode("utf-8") + if '"host":"smee.io"' not in content: + continue + payload = json.loads(content.split(":", maxsplit=1)[1].strip()) + if "taskId" not in payload["body"] or str(payload["body"]["taskId"]) != str(create_response["taskId"]): + continue + r.close() + job = Job(client=self, task_id=create_response["taskId"]) + job._last_result = payload["body"] + return job + raise AnticaptchaException(None, 250, "No matching task response received from smee.io") + + def getTaskResult(self, task_id: int) -> dict[str, Any]: + request = {"clientKey": self.client_key, "taskId": task_id} + response = self.session.post(urljoin(self.base_url, self.TASK_RESULT_URL), json=request).json() + self._check_response(response) + return response + + def getBalance(self) -> float: + request = { + "clientKey": self.client_key, + "softId": self.SOFT_ID, + } + response = self.session.post(urljoin(self.base_url, self.BALANCE_URL), json=request).json() + self._check_response(response) + return response["balance"] + + def getAppStats(self, soft_id: int, mode: str) -> dict[str, Any]: + request = {"clientKey": self.client_key, "softId": soft_id, "mode": mode} + response = self.session.post(urljoin(self.base_url, self.APP_STAT_URL), json=request).json() + self._check_response(response) + return response + + def reportIncorrectImage(self, task_id: int) -> bool: + request = {"clientKey": self.client_key, "taskId": task_id} + response = self.session.post(urljoin(self.base_url, self.REPORT_IMAGE_URL), json=request).json() + self._check_response(response) + return bool(response.get("status", False)) + + def reportIncorrectRecaptcha(self, task_id: int) -> bool: + request = {"clientKey": self.client_key, "taskId": task_id} + response = self.session.post(urljoin(self.base_url, self.REPORT_RECAPTCHA_URL), json=request).json() + self._check_response(response) + return response["status"] == "success" + + # Snake_case aliases + create_task = createTask + create_task_smee = createTaskSmee + get_task_result = getTaskResult + get_balance = getBalance + get_app_stats = getAppStats + report_incorrect_image = reportIncorrectImage + report_incorrect_recaptcha = reportIncorrectRecaptcha diff --git a/tests/test_async_client.py b/tests/test_async_client.py new file mode 100644 index 0000000..97b523d --- /dev/null +++ b/tests/test_async_client.py @@ -0,0 +1,293 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from python_anticaptcha.async_client import ( + SLEEP_EVERY_CHECK_FINISHED, + AsyncAnticaptchaClient, + AsyncJob, +) +from python_anticaptcha.exceptions import AnticaptchaException + + +class TestAsyncAnticaptchaClientInit: + def test_https_url(self): + client = AsyncAnticaptchaClient("key123") + assert client.base_url == "https://api.anti-captcha.com/" + assert client.client_key == "key123" + + def test_http_url(self): + client = AsyncAnticaptchaClient("key123", use_ssl=False) + assert client.base_url == "http://api.anti-captcha.com/" + + def test_custom_host(self): + client = AsyncAnticaptchaClient("key123", host="custom.host.com") + assert client.base_url == "https://custom.host.com/" + + def test_language_pool(self): + client = AsyncAnticaptchaClient("key123", language_pool="rn") + assert client.language_pool == "rn" + + def test_env_var_fallback(self, monkeypatch): + monkeypatch.setenv("ANTICAPTCHA_API_KEY", "env-key-456") + client = AsyncAnticaptchaClient() + assert client.client_key == "env-key-456" + + def test_explicit_key_over_env(self, monkeypatch): + monkeypatch.setenv("ANTICAPTCHA_API_KEY", "env-key-456") + client = AsyncAnticaptchaClient("explicit-key-789") + assert client.client_key == "explicit-key-789" + + def test_no_key_raises(self, monkeypatch): + monkeypatch.delenv("ANTICAPTCHA_API_KEY", raising=False) + with pytest.raises(AnticaptchaException) as exc_info: + AsyncAnticaptchaClient() + assert exc_info.value.error_code == "CONFIG_ERROR" + + +class TestAsyncCheckResponse: + def setup_method(self): + self.client = AsyncAnticaptchaClient("key123") + + @pytest.mark.asyncio + async def test_success_passthrough(self): + response = {"errorId": 0, "taskId": 42} + await self.client._check_response(response) + + @pytest.mark.asyncio + async def test_error_raises(self): + response = { + "errorId": 1, + "errorCode": "ERROR_KEY_DOES_NOT_EXIST", + "errorDescription": "Account authorization key not found", + } + with pytest.raises(AnticaptchaException) as exc_info: + await self.client._check_response(response) + assert exc_info.value.error_id == 1 + assert exc_info.value.error_code == "ERROR_KEY_DOES_NOT_EXIST" + + @pytest.mark.asyncio + async def test_error_id_11_appends_ip(self): + response = { + "errorId": 11, + "errorCode": "ERROR_IP_NOT_ALLOWED", + "errorDescription": "IP not allowed", + } + self.client._get_client_ip = AsyncMock(return_value="5.6.7.8") + with pytest.raises(AnticaptchaException) as exc_info: + await self.client._check_response(response) + assert "5.6.7.8" in exc_info.value.error_description + + +class TestAsyncCreateTask: + @pytest.mark.asyncio + async def test_payload_structure(self): + client = AsyncAnticaptchaClient("key123") + mock_task = MagicMock() + mock_task.serialize.return_value = {"type": "NoCaptchaTaskProxyless", "websiteURL": "https://example.com"} + + mock_response = MagicMock() + mock_response.json.return_value = {"errorId": 0, "taskId": 99} + + client.session.post = AsyncMock(return_value=mock_response) + job = await client.createTask(mock_task) + + call_kwargs = client.session.post.call_args + payload = call_kwargs[1]["json"] if "json" in call_kwargs[1] else call_kwargs.kwargs["json"] + assert payload["clientKey"] == "key123" + assert payload["task"] == {"type": "NoCaptchaTaskProxyless", "websiteURL": "https://example.com"} + assert payload["softId"] == 847 + assert payload["languagePool"] == "en" + assert isinstance(job, AsyncJob) + assert job.task_id == 99 + + +class TestAsyncGetBalance: + @pytest.mark.asyncio + async def test_returns_balance(self): + client = AsyncAnticaptchaClient("key123") + mock_response = MagicMock() + mock_response.json.return_value = {"errorId": 0, "balance": 3.21} + + client.session.post = AsyncMock(return_value=mock_response) + balance = await client.getBalance() + assert balance == 3.21 + + +class TestAsyncJobCheckIsReady: + @pytest.mark.asyncio + async def test_ready(self): + client = MagicMock() + client.getTaskResult = AsyncMock(return_value={"status": "ready", "solution": {}}) + job = AsyncJob(client, task_id=1) + assert await job.check_is_ready() is True + + @pytest.mark.asyncio + async def test_processing(self): + client = MagicMock() + client.getTaskResult = AsyncMock(return_value={"status": "processing"}) + job = AsyncJob(client, task_id=1) + assert await job.check_is_ready() is False + + +class TestAsyncJobSolutionGetters: + def setup_method(self): + self.client = MagicMock() + self.job = AsyncJob(self.client, task_id=1) + self.job._last_result = { + "status": "ready", + "solution": { + "gRecaptchaResponse": "recaptcha-token", + "token": "funcaptcha-token", + "text": "captcha text", + "cellNumbers": [1, 3, 5], + "answers": {"q1": "a1"}, + }, + } + + def test_get_solution_response(self): + assert self.job.get_solution_response() == "recaptcha-token" + + def test_get_token_response(self): + assert self.job.get_token_response() == "funcaptcha-token" + + def test_get_captcha_text(self): + assert self.job.get_captcha_text() == "captcha text" + + def test_get_cells_numbers(self): + assert self.job.get_cells_numbers() == [1, 3, 5] + + def test_get_answers(self): + assert self.job.get_answers() == {"q1": "a1"} + + def test_get_solution(self): + assert self.job.get_solution() == self.job._last_result["solution"] + + +class TestAsyncJobJoinTimeout: + @pytest.mark.asyncio + @patch("python_anticaptcha.async_client.asyncio.sleep", new_callable=AsyncMock) + async def test_timeout_raises(self, mock_sleep): + client = MagicMock() + client.getTaskResult = AsyncMock(return_value={"status": "processing"}) + job = AsyncJob(client, task_id=1) + with pytest.raises(AnticaptchaException) as exc_info: + await job.join(maximum_time=SLEEP_EVERY_CHECK_FINISHED) + assert "exceeded" in str(exc_info.value).lower() + + +class TestAsyncJobJoinOnCheck: + @pytest.mark.asyncio + @patch("python_anticaptcha.async_client.asyncio.sleep", new_callable=AsyncMock) + async def test_on_check_called_each_iteration(self, mock_sleep): + client = MagicMock() + client.getTaskResult = AsyncMock( + side_effect=[ + {"status": "processing"}, + {"status": "processing"}, + {"status": "ready", "solution": {}}, + ] + ) + job = AsyncJob(client, task_id=1) + callback = MagicMock() + await job.join(on_check=callback) + assert callback.call_count == 2 + callback.assert_any_call(SLEEP_EVERY_CHECK_FINISHED, "processing") + callback.assert_any_call(SLEEP_EVERY_CHECK_FINISHED * 2, "processing") + + @pytest.mark.asyncio + @patch("python_anticaptcha.async_client.asyncio.sleep", new_callable=AsyncMock) + async def test_on_check_none_by_default(self, mock_sleep): + client = MagicMock() + client.getTaskResult = AsyncMock(return_value={"status": "ready", "solution": {}}) + job = AsyncJob(client, task_id=1) + await job.join() + + @pytest.mark.asyncio + @patch("python_anticaptcha.async_client.asyncio.sleep", new_callable=AsyncMock) + async def test_on_check_not_called_when_immediately_ready(self, mock_sleep): + client = MagicMock() + client.getTaskResult = AsyncMock(return_value={"status": "ready", "solution": {}}) + job = AsyncJob(client, task_id=1) + callback = MagicMock() + await job.join(on_check=callback) + callback.assert_not_called() + + +class TestAsyncContextManager: + @pytest.mark.asyncio + async def test_aenter_returns_self(self): + client = AsyncAnticaptchaClient("key123") + assert await client.__aenter__() is client + + @pytest.mark.asyncio + async def test_aexit_closes_session(self): + client = AsyncAnticaptchaClient("key123") + client.session.aclose = AsyncMock() + await client.__aexit__(None, None, None) + client.session.aclose.assert_called_once() + + @pytest.mark.asyncio + async def test_close_closes_session(self): + client = AsyncAnticaptchaClient("key123") + client.session.aclose = AsyncMock() + await client.close() + client.session.aclose.assert_called_once() + + @pytest.mark.asyncio + async def test_async_with_statement(self): + async with AsyncAnticaptchaClient("key123") as client: + assert isinstance(client, AsyncAnticaptchaClient) + + @pytest.mark.asyncio + async def test_aexit_returns_false(self): + client = AsyncAnticaptchaClient("key123") + client.session.aclose = AsyncMock() + result = await client.__aexit__(None, None, None) + assert result is False + + +class TestAsyncSnakeCaseAliases: + def test_create_task_alias(self): + assert AsyncAnticaptchaClient.create_task is AsyncAnticaptchaClient.createTask + + def test_get_task_result_alias(self): + assert AsyncAnticaptchaClient.get_task_result is AsyncAnticaptchaClient.getTaskResult + + def test_get_balance_alias(self): + assert AsyncAnticaptchaClient.get_balance is AsyncAnticaptchaClient.getBalance + + def test_get_app_stats_alias(self): + assert AsyncAnticaptchaClient.get_app_stats is AsyncAnticaptchaClient.getAppStats + + def test_report_incorrect_image_alias(self): + assert AsyncAnticaptchaClient.report_incorrect_image is AsyncAnticaptchaClient.reportIncorrectImage + + def test_report_incorrect_recaptcha_alias(self): + assert AsyncAnticaptchaClient.report_incorrect_recaptcha is AsyncAnticaptchaClient.reportIncorrectRecaptcha + + @pytest.mark.asyncio + async def test_alias_works_on_instance(self): + client = AsyncAnticaptchaClient("key123") + mock_response = MagicMock() + mock_response.json.return_value = {"errorId": 0, "balance": 5.0} + client.session.post = AsyncMock(return_value=mock_response) + balance = await client.get_balance() + assert balance == 5.0 + + +class TestAsyncRepr: + def test_client_repr(self): + client = AsyncAnticaptchaClient("key123") + assert repr(client) == "" + + def test_job_repr_no_result(self): + client = MagicMock() + job = AsyncJob(client, task_id=42) + assert repr(job) == "" + + def test_job_repr_with_status(self): + client = MagicMock() + job = AsyncJob(client, task_id=42) + job._last_result = {"status": "ready"} + assert repr(job) == "" diff --git a/tests/test_examples.py b/tests/test_examples.py index d7f6c0e..5bd8054 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -22,9 +22,9 @@ def missing_proxy(*args, **kwargs): class AntiGateTestCase(TestCase): @retry(tries=3) def test_process_antigate(self): - from examples import antigate + from examples import sync_antigate - solution = antigate.process() + solution = sync_antigate.process() for key in ["url", "domain", "localStorage", "cookies", "fingerprint"]: self.assertIn(key, solution) @@ -37,9 +37,9 @@ class FuncaptchaTestCase(TestCase): # Occasionally fails, so I repeat my attempt to have others selected. @retry(tries=3) def test_funcaptcha(self): - from examples import funcaptcha_request + from examples import sync_funcaptcha_request - self.assertIn("Solved!", funcaptcha_request.process()) + self.assertIn("Solved!", sync_funcaptcha_request.process()) @pytest.mark.e2e @@ -48,9 +48,9 @@ class RecaptchaRequestTestCase(TestCase): # Anticaptcha responds is not fully reliable. @retry(tries=6) def test_process(self): - from examples import recaptcha_request + from examples import sync_recaptcha_request - self.assertIn(recaptcha_request.EXPECTED_RESULT, recaptcha_request.process()) + self.assertIn(sync_recaptcha_request.EXPECTED_RESULT, sync_recaptcha_request.process()) @pytest.mark.e2e @@ -60,9 +60,9 @@ class RecaptchaV3ProxylessTestCase(TestCase): # Anticaptcha responds is not fully reliable. @retry(tries=3) def test_process(self): - from examples import recaptcha3_request + from examples import sync_recaptcha3_request - self.assertTrue(recaptcha3_request.process()["success"]) + self.assertTrue(sync_recaptcha3_request.process()["success"]) @contextmanager @@ -84,7 +84,7 @@ class RecaptchaSeleniumtTestCase(TestCase): def test_process(self): from selenium.webdriver.chrome.options import Options - from examples import recaptcha_selenium + from examples import sync_recaptcha_selenium options = Options() options.headless = True @@ -93,16 +93,16 @@ def test_process(self): with open_driver( options=options, ) as driver: - self.assertIn(recaptcha_selenium.EXPECTED_RESULT, recaptcha_selenium.process(driver)) + self.assertIn(sync_recaptcha_selenium.EXPECTED_RESULT, sync_recaptcha_selenium.process(driver)) @pytest.mark.e2e @missing_key class TextTestCase(TestCase): def test_process(self): - from examples import text + from examples import sync_text - self.assertEqual(text.process(text.IMAGE).lower(), text.EXPECTED_RESULT.lower()) + self.assertEqual(sync_text.process(sync_text.IMAGE).lower(), sync_text.EXPECTED_RESULT.lower()) @pytest.mark.e2e @@ -111,9 +111,9 @@ def test_process(self): class HCaptchaTaskProxylessTestCase(TestCase): @retry(tries=3) def test_process(self): - from examples import hcaptcha_request + from examples import sync_hcaptcha_request - self.assertIn(hcaptcha_request.EXPECTED_RESULT, hcaptcha_request.process()) + self.assertIn(sync_hcaptcha_request.EXPECTED_RESULT, sync_hcaptcha_request.process()) @pytest.mark.e2e @@ -122,9 +122,9 @@ def test_process(self): class HCaptchaTaskTestCase(TestCase): @retry(tries=3) def test_process(self): - from examples import hcaptcha_request_proxy + from examples import sync_hcaptcha_request_proxy self.assertIn( "Your request have submitted successfully.", - hcaptcha_request_proxy.process(), + sync_hcaptcha_request_proxy.process(), ) diff --git a/tests/test_base.py b/tests/test_sync_client.py similarity index 95% rename from tests/test_base.py rename to tests/test_sync_client.py index 98b0b78..812f7a6 100644 --- a/tests/test_base.py +++ b/tests/test_sync_client.py @@ -2,8 +2,8 @@ import pytest -from python_anticaptcha.base import SLEEP_EVERY_CHECK_FINISHED, AnticaptchaClient, Job from python_anticaptcha.exceptions import AnticaptchaException +from python_anticaptcha.sync_client import SLEEP_EVERY_CHECK_FINISHED, AnticaptchaClient, Job class TestAnticaptchaClientInit: @@ -154,7 +154,7 @@ def test_get_solution(self): class TestJobJoinTimeout: - @patch("python_anticaptcha.base.time.sleep") + @patch("python_anticaptcha.sync_client.time.sleep") def test_timeout_raises(self, mock_sleep): client = MagicMock() client.getTaskResult.return_value = {"status": "processing"} @@ -165,7 +165,7 @@ def test_timeout_raises(self, mock_sleep): class TestJobJoinOnCheck: - @patch("python_anticaptcha.base.time.sleep") + @patch("python_anticaptcha.sync_client.time.sleep") def test_on_check_called_each_iteration(self, mock_sleep): client = MagicMock() # Return processing twice, then ready @@ -182,7 +182,7 @@ def test_on_check_called_each_iteration(self, mock_sleep): callback.assert_any_call(SLEEP_EVERY_CHECK_FINISHED, "processing") callback.assert_any_call(SLEEP_EVERY_CHECK_FINISHED * 2, "processing") - @patch("python_anticaptcha.base.time.sleep") + @patch("python_anticaptcha.sync_client.time.sleep") def test_on_check_none_by_default(self, mock_sleep): client = MagicMock() client.getTaskResult.return_value = {"status": "ready", "solution": {}} @@ -190,7 +190,7 @@ def test_on_check_none_by_default(self, mock_sleep): # Should not raise when on_check is not provided job.join() - @patch("python_anticaptcha.base.time.sleep") + @patch("python_anticaptcha.sync_client.time.sleep") def test_on_check_not_called_when_immediately_ready(self, mock_sleep): client = MagicMock() client.getTaskResult.return_value = {"status": "ready", "solution": {}} @@ -201,7 +201,7 @@ def test_on_check_not_called_when_immediately_ready(self, mock_sleep): class TestJobJoinBackoff: - @patch("python_anticaptcha.base.time.sleep") + @patch("python_anticaptcha.sync_client.time.sleep") def test_backoff_sleep_schedule(self, mock_sleep): client = MagicMock() # processing 6 times then ready — enough to hit the 10s cap @@ -219,7 +219,7 @@ def test_backoff_sleep_schedule(self, mock_sleep): sleep_values = [call.args[0] for call in mock_sleep.call_args_list] assert sleep_values == [1, 2, 4, 8, 10, 10] - @patch("python_anticaptcha.base.time.sleep") + @patch("python_anticaptcha.sync_client.time.sleep") def test_backoff_false_uses_fixed_interval(self, mock_sleep): client = MagicMock() client.getTaskResult.side_effect = [ @@ -233,7 +233,7 @@ def test_backoff_false_uses_fixed_interval(self, mock_sleep): sleep_values = [call.args[0] for call in mock_sleep.call_args_list] assert sleep_values == [SLEEP_EVERY_CHECK_FINISHED] * 3 - @patch("python_anticaptcha.base.time.sleep") + @patch("python_anticaptcha.sync_client.time.sleep") def test_backoff_timeout_still_works(self, mock_sleep): client = MagicMock() client.getTaskResult.return_value = {"status": "processing"} @@ -242,7 +242,7 @@ def test_backoff_timeout_still_works(self, mock_sleep): job.join(maximum_time=5, backoff=True) assert "exceeded" in str(exc_info.value).lower() - @patch("python_anticaptcha.base.time.sleep") + @patch("python_anticaptcha.sync_client.time.sleep") def test_backoff_with_on_check(self, mock_sleep): client = MagicMock() client.getTaskResult.side_effect = [