diff --git a/.idea/FastFetchBot.iml b/.idea/FastFetchBot.iml
index d9186b6..e23aeab 100644
--- a/.idea/FastFetchBot.iml
+++ b/.idea/FastFetchBot.iml
@@ -7,6 +7,7 @@
+
diff --git a/.idea/runConfigurations/fullstack_polling.xml b/.idea/runConfigurations/fullstack_polling_api.xml
similarity index 56%
rename from .idea/runConfigurations/fullstack_polling.xml
rename to .idea/runConfigurations/fullstack_polling_api.xml
index 309f504..0372403 100644
--- a/.idea/runConfigurations/fullstack_polling.xml
+++ b/.idea/runConfigurations/fullstack_polling_api.xml
@@ -1,8 +1,8 @@
-
+
-
+
\ No newline at end of file
diff --git a/CLAUDE.md b/CLAUDE.md
index 50e519b..9e8f9e0 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -2,7 +2,7 @@
## Project Overview
-FastFetchBot is a social media content fetching service built as a **UV workspace monorepo** with three microservices: a FastAPI server (API), a Telegram Bot client, and a Celery worker for file operations. It scrapes and archives content from various social media platforms including Twitter, Weibo, Xiaohongshu, Reddit, Bluesky, Instagram, Zhihu, Douban, YouTube, and Bilibili.
+FastFetchBot is a social media content fetching service built as a **UV workspace monorepo** with four microservices: a FastAPI server (API), a Telegram Bot client, a Celery worker for file operations, and an ARQ-based async worker for off-path scraping. It scrapes and archives content from various social media platforms including Twitter, Weibo, Xiaohongshu, Reddit, Bluesky, Instagram, Zhihu, Douban, YouTube, and Bilibili.
## Architecture
@@ -23,11 +23,13 @@ FastFetchBot/
│ │ ├── twitter/ bluesky/ weibo/ xiaohongshu/ reddit/
│ │ ├── instagram/ zhihu/ douban/ threads/ wechat/
│ │ └── general/ # Firecrawl + Zyte generic scraping
+│ ├── file_export/ # Async Celery task wrappers (PDF, video, audio transcription)
│ └── telegraph/ # Telegraph content publishing
-├── packages/file-export/ # fastfetchbot-file-export: video download, PDF export, transcription
+├── packages/file-export/ # fastfetchbot-file-export: synchronous Celery worker jobs (yt-dlp, WeasyPrint, OpenAI)
├── apps/api/ # FastAPI server: enriched service, routing, storage
├── apps/telegram-bot/ # Telegram Bot: webhook/polling, message handling
-├── apps/worker/ # Celery worker: async file operations (video, PDF, audio)
+├── apps/worker/ # Celery worker: sync file operations (video, PDF, audio)
+├── apps/async-worker/ # ARQ async worker: off-path scraping + enrichment
├── pyproject.toml # Root workspace configuration
└── uv.lock # Lockfile for the entire workspace
```
@@ -37,6 +39,7 @@ FastFetchBot/
| **API Server** (`apps/api/src/`) | `fastfetchbot-api` | 10450 | `gunicorn -k uvicorn.workers.UvicornWorker src.main:app --preload` |
| **Telegram Bot** (`apps/telegram-bot/core/`) | `fastfetchbot-telegram-bot` | 10451 | `python -m core.main` |
| **Worker** (`apps/worker/worker_core/`) | `fastfetchbot-worker` | — | `celery -A worker_core.main:app worker --loglevel=info --concurrency=2` |
+| **Async Worker** (`apps/async-worker/async_worker/`) | `fastfetchbot-async-worker` | — | `arq async_worker.main.WorkerSettings` |
| **Shared Library** (`packages/shared/fastfetchbot_shared/`) | `fastfetchbot-shared` | — | — |
| **File Export Library** (`packages/file-export/fastfetchbot_file_export/`) | `fastfetchbot-file-export` | — | — |
@@ -74,6 +77,7 @@ The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`).
- **`templates/`** — 13 Jinja2 templates for platform-specific output formatting (bundled via `__file__`-relative paths)
- **Platform modules**: `twitter/`, `bluesky/`, `weibo/`, `xiaohongshu/`, `reddit/`, `instagram/`, `zhihu/`, `douban/`, `threads/`, `wechat/`, `general/` (Firecrawl + Zyte)
- **`services/telegraph/`** — Telegraph content publishing (creates telegra.ph pages from scraped content)
+- **`services/file_export/`** — Async Celery task wrappers for PDF export, video download, and audio transcription. These accept `celery_app` and `timeout` as constructor parameters (dependency injection) so any app can use them with its own Celery client
The shared scrapers library can be used standalone without the API server:
```python
@@ -179,6 +183,8 @@ GitHub Actions (`.github/workflows/ci.yml`) builds and pushes all three images o
7. Add any new pip dependencies to `packages/shared/pyproject.toml` under `[project.optional-dependencies] scrapers`
### Key Conventions
+- **`packages/shared/` (`fastfetchbot-shared`)** is for shared async logic — scrapers, templates, Telegraph, and async Celery task wrappers (file_export). Most code here is async and reusable across apps
+- **`packages/file-export/` (`fastfetchbot-file-export`)** is exclusively for synchronous Celery worker jobs — the heavy I/O operations that run inside the Celery worker process (yt-dlp video download, WeasyPrint PDF generation, OpenAI audio transcription). Apps never import this package directly; they use the async wrappers in `fastfetchbot_shared.services.file_export` which submit tasks to the Celery worker
- **Scrapers, templates, and Telegraph live in `packages/shared/`** — they are framework-agnostic and reusable
- Scraper config (platform credentials, Firecrawl/Zyte settings) lives in `fastfetchbot_shared.services.scrapers.config`, **not** in `apps/api/src/config.py`
- API-only config (BASE_URL, MongoDB, Celery, AWS, Inoreader) stays in `apps/api/src/config.py`
diff --git a/apps/api/src/services/file_export/audio_transcribe/__init__.py b/apps/api/src/services/file_export/audio_transcribe/__init__.py
index 2f811ca..810d52f 100644
--- a/apps/api/src/services/file_export/audio_transcribe/__init__.py
+++ b/apps/api/src/services/file_export/audio_transcribe/__init__.py
@@ -1,29 +1,16 @@
-import asyncio
+"""API-layer audio transcription — wraps the shared AudioTranscribe with API config."""
-from src.config import DOWNLOAD_VIDEO_TIMEOUT
+from fastfetchbot_shared.services.file_export.audio_transcribe import AudioTranscribe as BaseAudioTranscribe
from src.services.celery_client import celery_app
-from fastfetchbot_shared.utils.logger import logger
+from src.config import DOWNLOAD_VIDEO_TIMEOUT
-class AudioTranscribe:
- def __init__(self, audio_file: str):
- self.audio_file = audio_file
+class AudioTranscribe(BaseAudioTranscribe):
+ """API AudioTranscribe that injects the API's Celery app and timeout."""
- async def transcribe(self):
- return await self._get_audio_text(self.audio_file)
-
- @staticmethod
- async def _get_audio_text(audio_file: str):
- logger.info(f"submitting transcribe task: {audio_file}")
- result = celery_app.send_task("file_export.transcribe", kwargs={
- "audio_file": audio_file,
- })
- try:
- response = await asyncio.to_thread(result.get, timeout=int(DOWNLOAD_VIDEO_TIMEOUT))
- return response["transcript"]
- except Exception:
- logger.exception(
- f"file_export.transcribe task failed: audio_file={audio_file}, "
- f"timeout={DOWNLOAD_VIDEO_TIMEOUT}"
- )
- raise
+ def __init__(self, audio_file: str):
+ super().__init__(
+ audio_file=audio_file,
+ celery_app=celery_app,
+ timeout=DOWNLOAD_VIDEO_TIMEOUT,
+ )
diff --git a/apps/api/src/services/file_export/document_export/pdf_export.py b/apps/api/src/services/file_export/document_export/pdf_export.py
index 111c3a4..4a0193c 100644
--- a/apps/api/src/services/file_export/document_export/pdf_export.py
+++ b/apps/api/src/services/file_export/document_export/pdf_export.py
@@ -1,10 +1,10 @@
-import asyncio
-import uuid
+"""API-layer PDF export — extends the shared PdfExport with S3 upload support."""
+
from pathlib import Path
import aiofiles.os
-from bs4 import BeautifulSoup
+from fastfetchbot_shared.services.file_export.pdf_export import PdfExport as BasePdfExport, wrap_html_string
from src.config import DOWNLOAD_VIDEO_TIMEOUT, AWS_STORAGE_ON
from src.services.celery_client import celery_app
from src.services.amazon.s3 import upload as upload_to_s3
@@ -19,48 +19,23 @@ async def upload_file_to_s3(output_filename):
)
-class PdfExport:
+class PdfExport(BasePdfExport):
+ """API PDF export that adds optional S3 upload after Celery PDF generation."""
+
def __init__(self, title: str, html_string: str = None):
- self.title = title
- self.html_string = html_string
+ super().__init__(
+ title=title,
+ html_string=html_string,
+ celery_app=celery_app,
+ timeout=DOWNLOAD_VIDEO_TIMEOUT,
+ )
async def export(self) -> str:
- html_string = self.wrap_html_string(self.html_string)
- output_filename = f"{self.title}-{uuid.uuid4()}.pdf"
-
- logger.info(f"submitting pdf export task: {output_filename}")
- result = celery_app.send_task("file_export.pdf_export", kwargs={
- "html_string": html_string,
- "output_filename": output_filename,
- })
- try:
- response = await asyncio.to_thread(result.get, timeout=int(DOWNLOAD_VIDEO_TIMEOUT))
- output_filename = response["output_filename"]
- except Exception:
- logger.exception(
- f"file_export.pdf_export task failed: output_filename={output_filename}, "
- f"timeout={DOWNLOAD_VIDEO_TIMEOUT}"
- )
- raise
- logger.info(f"pdf export success: {output_filename}")
+ output_filename = await super().export()
if AWS_STORAGE_ON:
local_filename = output_filename
output_filename = await upload_file_to_s3(Path(output_filename))
await aiofiles.os.remove(local_filename)
- return output_filename
- @staticmethod
- def wrap_html_string(html_string: str) -> str:
- soup = BeautifulSoup(
- '
'
- '',
- "html.parser",
- )
- soup.body.append(BeautifulSoup(html_string, "html.parser"))
- for tag in soup.find_all(True):
- if "style" in tag.attrs:
- del tag["style"]
- for style_tag in soup.find_all("style"):
- style_tag.decompose()
- return soup.prettify()
+ return output_filename
diff --git a/apps/api/src/services/file_export/video_download/__init__.py b/apps/api/src/services/file_export/video_download/__init__.py
index 0dda8e1..a8a2e48 100644
--- a/apps/api/src/services/file_export/video_download/__init__.py
+++ b/apps/api/src/services/file_export/video_download/__init__.py
@@ -1,237 +1,35 @@
-import asyncio
-from typing import Any, Optional
+"""API-layer video downloader — wraps the shared VideoDownloader with API config."""
-import httpx
-from urllib.parse import urlparse, parse_qs
+from typing import Any, Optional
-from fastfetchbot_shared.models.metadata_item import MetadataItem, MessageType, MediaFile
-from src.services.file_export.audio_transcribe import AudioTranscribe
+from fastfetchbot_shared.services.file_export.video_download import VideoDownloader as BaseVideoDownloader
from src.services.celery_client import celery_app
from src.config import DOWNLOAD_VIDEO_TIMEOUT
-from fastfetchbot_shared.utils.parse import unix_timestamp_to_utc, second_to_time, wrap_text_into_html
-from fastfetchbot_shared.utils.logger import logger
-from fastfetchbot_shared.services.scrapers.config import JINJA2_ENV
-video_info_template = JINJA2_ENV.get_template("video_info.jinja2")
+class VideoDownloader(BaseVideoDownloader):
+ """API VideoDownloader that injects the API's Celery app and timeout."""
-class VideoDownloader(MetadataItem):
def __init__(
- self,
- url: str,
- category: str,
- data: Optional[Any] = None,
- download: bool = True,
- audio_only: bool = False,
- hd: bool = False,
- transcribe: bool = False,
- **kwargs,
+ self,
+ url: str,
+ category: str,
+ data: Optional[Any] = None,
+ download: bool = True,
+ audio_only: bool = False,
+ hd: bool = False,
+ transcribe: bool = False,
+ **kwargs,
):
- self.extractor = category
- self.url = url
- self.author_url = ""
- self.download = download
- self.audio_only = audio_only
- self.transcribe = transcribe
- self.hd = hd
- self.message_type = MessageType.SHORT
- self.file_path = None
- # metadata variables
- self.category = category
- self.media_files = []
- # auxiliary variables
- self.created = None
- self.duration = None
-
- @classmethod
- async def create(cls, *args, **kwargs):
- instance = cls(*args, **kwargs)
- instance.url = await instance._parse_url(instance.url)
- return instance
-
- async def get_item(self) -> dict:
- self.url = await self._parse_url(self.url)
- await self.get_video()
- return self.to_dict()
-
- async def get_video(self) -> None:
- content_info = await self.get_video_info()
- self.file_path = content_info["file_path"]
- video_info_funcs = {
- "youtube": self._youtube_info_parse,
- "bilibili": self._bilibili_info_parse,
- }
- meta_info = video_info_funcs[self.extractor](content_info)
- self._video_info_formatting(meta_info)
- # AI transcribe
- if self.transcribe:
- audio_content_info = await self.get_video_info(audio_only=True)
- audio_file_path = audio_content_info["file_path"]
- audio_transcribe = AudioTranscribe(audio_file_path)
- transcribe_text = await audio_transcribe.transcribe()
- if self.download is False:
- self.message_type = MessageType.LONG
- self.text += "\nAI全文摘录:" + transcribe_text
- self.content += "
" + wrap_text_into_html(transcribe_text)
-
- async def _parse_url(self, url: str) -> str:
- async def _get_redirected_url(original_url: str) -> str:
- async with httpx.AsyncClient(follow_redirects=False) as client:
- resp = await client.get(original_url)
- if resp.status_code == 200:
- original_url = resp.url
- elif resp.status_code == 302:
- original_url = resp.headers["Location"]
- return original_url
-
- def _remove_youtube_link_tracing(original_url: str) -> str:
- original_url_parser = urlparse(original_url)
- original_url_hostname = str(original_url_parser.hostname)
-
- if "youtu.be" in original_url_hostname:
- # remove all queries
- original_url = original_url.split("?")[0]
- if "youtube.com" in original_url_hostname:
- # remove all queries except "?v=" part
- original_url = original_url_parser.scheme + "://" + original_url_parser.netloc + original_url_parser.path
- if original_url_parser.query:
- v_part_query = [item for item in original_url_parser.query.split("&") if "v=" in item]
- if v_part_query:
- original_url += "?" + v_part_query[0]
- return original_url
-
- def _remove_bilibili_link_tracing(original_url: str) -> str:
- original_url_parser = urlparse(original_url)
- original_url_hostname = str(original_url_parser.hostname)
- query_dict = parse_qs(original_url_parser.query)
- bilibili_p_query_string = "?p=" + query_dict["p"][0] if 'p' in query_dict else ""
-
- if "bilibili.com" in original_url_hostname:
- original_url = original_url_parser.scheme + "://" + original_url_parser.netloc + original_url_parser.path
- return original_url + bilibili_p_query_string
-
- logger.info(f"parsing original video url: {url} for {self.extractor}")
-
- url_parser = urlparse(url)
- url_hostname = str(url_parser.hostname)
-
- if self.extractor == "bilibili":
- if "b23.tv" in url_hostname:
- url = await _get_redirected_url(url)
- if "m.bilibili.com" in url_hostname:
- url = url.replace("m.bilibili.com", "www.bilibili.com")
- url = _remove_bilibili_link_tracing(url)
- elif self.extractor == "youtube":
- if "youtu.be" in url_hostname:
- url = await _get_redirected_url(url)
- url = _remove_youtube_link_tracing(url)
-
- logger.info(f"parsed video url: {url} for {self.extractor}")
- return url
-
- async def get_video_info(
- self,
- url: str = None,
- download: bool = None,
- extractor: str = None,
- audio_only: bool = None,
- hd: bool = None,
- ) -> dict:
- """
- Submit a Celery task to download/extract video info.
- :return: video info dict
- """
- if url is None:
- url = self.url
- if download is None:
- download = self.download
- if extractor is None:
- extractor = self.extractor
- if audio_only is None:
- audio_only = self.audio_only
- if hd is None:
- hd = self.hd
- body = {
- "url": url,
- "download": download,
- "extractor": extractor,
- "audio_only": audio_only,
- "hd": hd,
- }
- logger.info(f"submitting video download task: {body}")
- if download is True:
- logger.info("video downloading... it may take a while")
- if hd is True:
- logger.info("downloading HD video, it may take longer")
- elif audio_only is True:
- logger.info("downloading audio only")
- logger.debug(f"downloading video timeout: {DOWNLOAD_VIDEO_TIMEOUT}")
- result = celery_app.send_task("file_export.video_download", kwargs=body)
- try:
- response = await asyncio.to_thread(result.get, timeout=int(DOWNLOAD_VIDEO_TIMEOUT))
- content_info = response["content_info"]
- content_info["file_path"] = response["file_path"]
- return content_info
- except Exception:
- logger.exception(
- f"file_export.video_download task failed: url={url}, extractor={extractor}, "
- f"timeout={DOWNLOAD_VIDEO_TIMEOUT}"
- )
- raise
-
- def _video_info_formatting(self, meta_info: dict):
- self.title = meta_info["title"]
- self.author = meta_info["author"]
- self.author_url = meta_info["author_url"]
- if len(meta_info["description"]) > 800:
- meta_info["description"] = meta_info["description"][:800] + "..."
- self.created = meta_info["upload_date"]
- self.duration = meta_info["duration"]
- self.text = video_info_template.render(
- data={
- "url": self.url,
- "title": self.title,
- "author": self.author,
- "author_url": self.author_url,
- "duration": self.duration,
- "created": self.created,
- "playback_data": meta_info["playback_data"],
- "description": meta_info["description"],
- }
+ super().__init__(
+ url=url,
+ category=category,
+ celery_app=celery_app,
+ timeout=DOWNLOAD_VIDEO_TIMEOUT,
+ data=data,
+ download=download,
+ audio_only=audio_only,
+ hd=hd,
+ transcribe=transcribe,
+ **kwargs,
)
- self.content = self.text.replace("\n", "
")
- if self.download:
- media_type = "video"
- if self.audio_only:
- media_type = "audio"
- self.media_files = [MediaFile(media_type, self.file_path, "")]
-
- @staticmethod
- def _youtube_info_parse(video_info: dict) -> dict:
- return {
- "id": video_info["id"],
- "title": video_info["title"],
- "author": video_info["uploader"],
- "author_url": video_info["uploader_url"] or video_info["channel_url"],
- "description": video_info["description"],
- "playback_data": f"视频播放量:{video_info['view_count']} 评论数:{video_info['comment_count']}",
- "author_avatar": video_info["thumbnail"],
- "upload_date": str(video_info["upload_date"]),
- "duration": second_to_time(round(video_info["duration"])),
- }
-
- @staticmethod
- def _bilibili_info_parse(video_info: dict) -> dict:
- return {
- "id": video_info["id"],
- "title": video_info["title"],
- "author": video_info["uploader"],
- "author_url": "https://space.bilibili.com/"
- + str(video_info["uploader_id"]),
- "author_avatar": video_info["thumbnail"],
- "ext": video_info["ext"],
- "description": video_info["description"],
- "playback_data": f"视频播放量:{video_info['view_count']} 弹幕数:{video_info['comment_count']} 点赞数:{video_info['like_count']}",
- "upload_date": unix_timestamp_to_utc(video_info["timestamp"]),
- "duration": second_to_time(round(video_info["duration"])),
- }
diff --git a/apps/async-worker/Dockerfile b/apps/async-worker/Dockerfile
new file mode 100644
index 0000000..04177af
--- /dev/null
+++ b/apps/async-worker/Dockerfile
@@ -0,0 +1,55 @@
+
+# `python-base` sets up all our shared environment variables
+FROM python:3.12-slim AS python-base
+
+ENV PYTHONUNBUFFERED=1 \
+ PYTHONDONTWRITEBYTECODE=1 \
+ # uv settings
+ UV_PROJECT_ENVIRONMENT="/opt/pysetup/.venv" \
+ UV_COMPILE_BYTECODE=1 \
+ UV_LINK_MODE=copy \
+ # paths
+ PYSETUP_PATH="/opt/pysetup" \
+ VENV_PATH="/opt/pysetup/.venv"
+
+# prepend venv to path
+ENV PATH="$VENV_PATH/bin:$PATH"
+
+
+# `builder-base` stage is used to build deps + create our virtual environment
+FROM python-base AS builder-base
+
+# install uv from the official image
+COPY --from=ghcr.io/astral-sh/uv:0.10.4 /uv /usr/local/bin/uv
+
+RUN apt-get update \
+ && apt-get install --no-install-recommends -y \
+ curl \
+ ca-certificates \
+ build-essential \
+ libffi-dev \
+ && rm -rf /var/lib/apt/lists/*
+
+# copy workspace files for dependency resolution
+WORKDIR $PYSETUP_PATH
+COPY pyproject.toml uv.lock ./
+COPY packages/ packages/
+COPY apps/async-worker/ apps/async-worker/
+
+# install runtime deps
+RUN uv sync --frozen --no-dev --no-install-project --package fastfetchbot-async-worker
+
+
+# `production` image used for runtime
+FROM python-base AS production
+ENV PYTHONPATH=/app/apps/async-worker:$PYTHONPATH
+RUN apt-get update \
+ && apt-get install --no-install-recommends -y \
+ ca-certificates \
+ libffi-dev \
+ && rm -rf /var/lib/apt/lists/*
+COPY --from=builder-base $PYSETUP_PATH $PYSETUP_PATH
+COPY packages/ /app/packages/
+COPY apps/async-worker/ /app/apps/async-worker/
+WORKDIR /app/apps/async-worker
+CMD ["arq", "async_worker.main.WorkerSettings"]
diff --git a/apps/async-worker/async_worker/__init__.py b/apps/async-worker/async_worker/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/apps/async-worker/async_worker/celery_client.py b/apps/async-worker/async_worker/celery_client.py
new file mode 100644
index 0000000..3f0d665
--- /dev/null
+++ b/apps/async-worker/async_worker/celery_client.py
@@ -0,0 +1,9 @@
+from celery import Celery
+
+from async_worker.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND
+
+celery_app = Celery(
+ "fastfetchbot_worker",
+ broker=CELERY_BROKER_URL,
+ backend=CELERY_RESULT_BACKEND,
+)
diff --git a/apps/async-worker/async_worker/config.py b/apps/async-worker/async_worker/config.py
new file mode 100644
index 0000000..40cf333
--- /dev/null
+++ b/apps/async-worker/async_worker/config.py
@@ -0,0 +1,29 @@
+import os
+
+from fastfetchbot_shared.utils.parse import get_env_bool
+
+env = os.environ
+
+# ARQ Redis (task queue)
+ARQ_REDIS_URL = env.get("ARQ_REDIS_URL", "redis://localhost:6379/2")
+
+# Outbox Redis (result delivery)
+OUTBOX_REDIS_URL = env.get("OUTBOX_REDIS_URL", "redis://localhost:6379/3")
+OUTBOX_QUEUE_KEY = env.get("OUTBOX_QUEUE_KEY", "scrape:outbox")
+
+# Celery (for PDF export tasks on existing worker)
+CELERY_BROKER_URL = env.get("CELERY_BROKER_URL", "redis://localhost:6379/0")
+CELERY_RESULT_BACKEND = env.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
+
+# Feature flags
+STORE_TELEGRAPH = get_env_bool(env, "STORE_TELEGRAPH", True)
+STORE_DOCUMENT = get_env_bool(env, "STORE_DOCUMENT", False)
+DATABASE_ON = get_env_bool(env, "DATABASE_ON", False)
+
+# MongoDB (optional, for DB storage)
+MONGODB_HOST = env.get("MONGODB_HOST", "localhost")
+MONGODB_PORT = int(env.get("MONGODB_PORT", 27017))
+MONGODB_URL = env.get("MONGODB_URL", f"mongodb://{MONGODB_HOST}:{MONGODB_PORT}")
+
+# Download timeout for Celery PDF tasks
+DOWNLOAD_VIDEO_TIMEOUT = int(env.get("DOWNLOAD_VIDEO_TIMEOUT", 600))
diff --git a/apps/async-worker/async_worker/main.py b/apps/async-worker/async_worker/main.py
new file mode 100644
index 0000000..cbae148
--- /dev/null
+++ b/apps/async-worker/async_worker/main.py
@@ -0,0 +1,48 @@
+import asyncio
+
+from arq.connections import RedisSettings
+
+from async_worker.config import ARQ_REDIS_URL
+from async_worker.tasks.scrape import scrape_and_enrich
+
+# The twitter-api-client-v2 library installs uvloop's EventLoopPolicy at
+# import time (triggered transitively via InfoExtractService → twitter).
+# uvloop's policy raises RuntimeError from get_event_loop() if no loop
+# exists, unlike stdlib asyncio which lazily creates one. ARQ's
+# Worker.__init__ calls get_event_loop() before any loop is running.
+#
+# Fix: after all imports have completed (so uvloop policy is in place),
+# create and set a loop using uvloop's own new_event_loop().
+try:
+ asyncio.get_event_loop()
+except RuntimeError:
+ asyncio.set_event_loop(asyncio.new_event_loop())
+
+
+def parse_redis_url(url: str) -> RedisSettings:
+ """Parse a redis:// URL into ARQ RedisSettings."""
+ from urllib.parse import urlparse
+
+ parsed = urlparse(url)
+ return RedisSettings(
+ host=parsed.hostname or "localhost",
+ port=parsed.port or 6379,
+ database=int(parsed.path.lstrip("/") or 0),
+ password=parsed.password,
+ )
+
+
+class WorkerSettings:
+ """ARQ worker configuration."""
+
+ functions = [scrape_and_enrich]
+ redis_settings = parse_redis_url(ARQ_REDIS_URL)
+
+ # Job timeout: 10 minutes (matches existing Celery soft limit)
+ job_timeout = 600
+
+ # Maximum concurrent jobs
+ max_jobs = 10
+
+ # Keep results for 1 hour
+ keep_result = 3600
diff --git a/apps/async-worker/async_worker/services/__init__.py b/apps/async-worker/async_worker/services/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/apps/async-worker/async_worker/services/enrichment.py b/apps/async-worker/async_worker/services/enrichment.py
new file mode 100644
index 0000000..5b796c6
--- /dev/null
+++ b/apps/async-worker/async_worker/services/enrichment.py
@@ -0,0 +1,66 @@
+"""Enrichment steps applied after scraping: Telegraph publishing and PDF export."""
+
+from fastfetchbot_shared.models.metadata_item import MessageType
+from fastfetchbot_shared.services.telegraph import Telegraph
+from fastfetchbot_shared.utils.logger import logger
+from async_worker.config import STORE_TELEGRAPH, STORE_DOCUMENT, DOWNLOAD_VIDEO_TIMEOUT
+
+
+async def enrich(
+ metadata_item: dict,
+ store_telegraph: bool | None = None,
+ store_document: bool | None = None,
+) -> dict:
+ """Apply enrichment steps to a scraped metadata item.
+
+ - Telegraph publishing
+ - PDF export (via shared PdfExport → Celery worker)
+ """
+ if store_telegraph is None:
+ store_telegraph = STORE_TELEGRAPH
+ if store_document is None:
+ store_document = STORE_DOCUMENT
+
+ # Force Telegraph for long messages
+ if metadata_item.get("message_type") == MessageType.LONG:
+ store_telegraph = True
+ logger.info("Message type is long, forcing Telegraph publish")
+
+ # Telegraph publishing
+ if store_telegraph:
+ telegraph_item = Telegraph.from_dict(metadata_item)
+ try:
+ telegraph_url = await telegraph_item.get_telegraph()
+ except Exception as e:
+ logger.error(f"Error publishing to Telegraph: {e}")
+ telegraph_url = ""
+ metadata_item["telegraph_url"] = telegraph_url
+
+ # PDF export via shared async wrapper → Celery worker
+ if store_document or (
+ not store_document and metadata_item.get("telegraph_url") == ""
+ ):
+ logger.info("Exporting to PDF via Celery worker")
+ try:
+ from fastfetchbot_shared.services.file_export.pdf_export import PdfExport
+ from async_worker.celery_client import celery_app
+
+ pdf_export = PdfExport(
+ title=metadata_item["title"],
+ html_string=metadata_item["content"],
+ celery_app=celery_app,
+ timeout=DOWNLOAD_VIDEO_TIMEOUT,
+ )
+ output_filename = await pdf_export.export()
+ metadata_item["media_files"].append(
+ {
+ "media_type": "document",
+ "url": output_filename,
+ "caption": "",
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error exporting PDF: {e}")
+
+ metadata_item["title"] = metadata_item["title"].strip()
+ return metadata_item
diff --git a/apps/async-worker/async_worker/services/outbox.py b/apps/async-worker/async_worker/services/outbox.py
new file mode 100644
index 0000000..bd25345
--- /dev/null
+++ b/apps/async-worker/async_worker/services/outbox.py
@@ -0,0 +1,44 @@
+import json
+
+import redis.asyncio as aioredis
+
+from async_worker.config import OUTBOX_REDIS_URL, OUTBOX_QUEUE_KEY
+from fastfetchbot_shared.utils.logger import logger
+
+_redis: aioredis.Redis | None = None
+
+
+async def get_outbox_redis() -> aioredis.Redis:
+ """Get or create the outbox Redis connection."""
+ global _redis
+ if _redis is None:
+ _redis = aioredis.from_url(OUTBOX_REDIS_URL, decode_responses=True)
+ return _redis
+
+
+async def push(
+ job_id: str,
+ chat_id: int | str,
+ metadata_item: dict | None = None,
+ message_id: int | None = None,
+ error: str | None = None,
+) -> None:
+ """Push a result payload to the Redis outbox queue."""
+ r = await get_outbox_redis()
+ payload = {
+ "job_id": job_id,
+ "chat_id": chat_id,
+ "message_id": message_id,
+ "metadata_item": metadata_item,
+ "error": error,
+ }
+ await r.lpush(OUTBOX_QUEUE_KEY, json.dumps(payload, ensure_ascii=False))
+ logger.info(f"Pushed result to outbox: job_id={job_id}, error={error is not None}")
+
+
+async def close() -> None:
+ """Close the outbox Redis connection."""
+ global _redis
+ if _redis is not None:
+ await _redis.aclose()
+ _redis = None
diff --git a/apps/async-worker/async_worker/tasks/__init__.py b/apps/async-worker/async_worker/tasks/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/apps/async-worker/async_worker/tasks/scrape.py b/apps/async-worker/async_worker/tasks/scrape.py
new file mode 100644
index 0000000..9282434
--- /dev/null
+++ b/apps/async-worker/async_worker/tasks/scrape.py
@@ -0,0 +1,89 @@
+import uuid
+import traceback
+
+from fastfetchbot_shared.models.url_metadata import UrlMetadata
+from fastfetchbot_shared.services.scrapers.common import InfoExtractService
+from fastfetchbot_shared.utils.logger import logger
+from async_worker.services import outbox, enrichment
+from async_worker.celery_client import celery_app
+from async_worker.config import DOWNLOAD_VIDEO_TIMEOUT
+
+
+async def scrape_and_enrich(
+ ctx: dict,
+ url: str,
+ chat_id: int | str,
+ job_id: str | None = None,
+ message_id: int | None = None,
+ source: str = "",
+ content_type: str = "",
+ store_telegraph: bool | None = None,
+ store_document: bool | None = None,
+ **kwargs,
+) -> dict:
+ """ARQ task: scrape a URL, enrich the result, and push to the outbox.
+
+ Args:
+ ctx: ARQ worker context.
+ url: The URL to scrape.
+ chat_id: Telegram chat ID for result delivery.
+ job_id: Unique job identifier (generated if not provided).
+ message_id: Optional Telegram message ID for reply threading.
+ source: URL source platform (e.g. "twitter", "weibo").
+ content_type: Content type (e.g. "social_media", "video").
+ store_telegraph: Override Telegraph publishing flag.
+ store_document: Override PDF export flag.
+ **kwargs: Extra arguments passed to the scraper.
+ """
+ if job_id is None:
+ job_id = str(uuid.uuid4())
+
+ logger.info(f"[{job_id}] Starting scrape: url={url}, source={source}")
+
+ try:
+ # Build UrlMetadata and scrape
+ url_metadata = UrlMetadata(
+ url=url, source=source, content_type=content_type
+ )
+ service = InfoExtractService(
+ url_metadata=url_metadata,
+ store_telegraph=False, # We handle enrichment separately
+ store_document=False,
+ celery_app=celery_app,
+ timeout=DOWNLOAD_VIDEO_TIMEOUT,
+ **kwargs,
+ )
+ metadata_item = await service.get_item()
+
+ # Enrich: Telegraph, PDF
+ metadata_item = await enrichment.enrich(
+ metadata_item,
+ store_telegraph=store_telegraph,
+ store_document=store_document,
+ )
+
+ logger.info(f"[{job_id}] Scrape completed successfully")
+
+ # Push to outbox
+ await outbox.push(
+ job_id=job_id,
+ chat_id=chat_id,
+ message_id=message_id,
+ metadata_item=metadata_item,
+ )
+
+ return {"job_id": job_id, "status": "success"}
+
+ except Exception as e:
+ logger.error(f"[{job_id}] Scrape failed: {e}")
+ logger.error(traceback.format_exc())
+
+ # Push error to outbox so the bot can notify the user
+ await outbox.push(
+ job_id=job_id,
+ chat_id=chat_id,
+ message_id=message_id,
+ error=str(e),
+ )
+
+ return {"job_id": job_id, "status": "error", "error": str(e)}
diff --git a/apps/async-worker/pyproject.toml b/apps/async-worker/pyproject.toml
new file mode 100644
index 0000000..6c4a00a
--- /dev/null
+++ b/apps/async-worker/pyproject.toml
@@ -0,0 +1,24 @@
+[project]
+name = "fastfetchbot-async-worker"
+version = "0.1.0"
+requires-python = ">=3.12,<3.13"
+dependencies = [
+ "fastfetchbot-shared[scrapers]",
+ "arq>=0.26.1",
+ "redis[hiredis]>=5.0.0",
+ "celery[redis]>=5.4.0",
+ "beanie>=1.29.0",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["async_worker"]
+
+[tool.uv]
+package = false
+
+[tool.uv.sources]
+fastfetchbot-shared = { workspace = true }
diff --git a/apps/telegram-bot/core/config.py b/apps/telegram-bot/core/config.py
index 1a009b2..c637c7f 100644
--- a/apps/telegram-bot/core/config.py
+++ b/apps/telegram-bot/core/config.py
@@ -121,6 +121,14 @@ def ban_list_resolver(ban_list_string: str) -> list:
OPENAI_API_KEY = env.get("OPENAI_API_KEY", None)
GENERAL_SCRAPING_ON = get_env_bool(env, "GENERAL_SCRAPING_ON", False)
+# Scrape mode: "api" (sync via API server) or "queue" (async via ARQ worker)
+SCRAPE_MODE = env.get("SCRAPE_MODE", "queue")
+
+# Redis URLs for queue mode
+ARQ_REDIS_URL = env.get("ARQ_REDIS_URL", "redis://localhost:6379/2")
+OUTBOX_REDIS_URL = env.get("OUTBOX_REDIS_URL", "redis://localhost:6379/3")
+OUTBOX_QUEUE_KEY = env.get("OUTBOX_QUEUE_KEY", "scrape:outbox")
+
# Database configuration
ITEM_DATABASE_ON = get_env_bool(env, "ITEM_DATABASE_ON", False)
MONGODB_PORT = int(env.get("MONGODB_PORT", 27017)) or 27017
diff --git a/apps/telegram-bot/core/handlers/buttons.py b/apps/telegram-bot/core/handlers/buttons.py
index d3a24c1..c550d1e 100644
--- a/apps/telegram-bot/core/handlers/buttons.py
+++ b/apps/telegram-bot/core/handlers/buttons.py
@@ -13,6 +13,7 @@
from fastfetchbot_shared.utils.logger import logger
from core.config import (
TELEGRAM_CHANNEL_ID,
+ SCRAPE_MODE,
)
@@ -57,24 +58,40 @@ async def buttons_process(update: Update, context: CallbackContext) -> None:
chat_id = query.message.chat_id
if data["type"] == "video":
await query.answer("Video processing...")
- replying_message = await query.message.reply_text(
- text=f"Item processing...",
- )
extra_args = data["extra_args"] if "extra_args" in data else {}
- metadata_item = await api_client.get_item(
- url=data["url"], **extra_args
- )
- await replying_message.edit_text(
- text=f"Item processed. Sending to the target...",
- )
- if data["type"] == "force":
- metadata_item["message_type"] = MessageType.SHORT
- await send_item_message(metadata_item, chat_id=chat_id)
- if data["type"] == "channel":
- await query.message.reply_text(
- text=f"Item sent to the channel.",
+
+ if SCRAPE_MODE == "queue":
+ from core import queue_client
+
+ replying_message = await query.message.reply_text(
+ text=f"Item queued for processing...",
+ )
+ await queue_client.enqueue_scrape(
+ url=data["url"],
+ chat_id=chat_id,
+ source=data.get("source", ""),
+ content_type=data.get("content_type", ""),
+ **extra_args,
)
- await replying_message.delete()
+ await replying_message.delete()
+ else:
+ replying_message = await query.message.reply_text(
+ text=f"Item processing...",
+ )
+ metadata_item = await api_client.get_item(
+ url=data["url"], **extra_args
+ )
+ await replying_message.edit_text(
+ text=f"Item processed. Sending to the target...",
+ )
+ if data["type"] == "force":
+ metadata_item["message_type"] = MessageType.SHORT
+ await send_item_message(metadata_item, chat_id=chat_id)
+ if data["type"] == "channel":
+ await query.message.reply_text(
+ text=f"Item sent to the channel.",
+ )
+ await replying_message.delete()
await query.message.delete()
context.drop_callback_data(query)
diff --git a/apps/telegram-bot/core/handlers/url_process.py b/apps/telegram-bot/core/handlers/url_process.py
index 1f33db2..2d54efc 100644
--- a/apps/telegram-bot/core/handlers/url_process.py
+++ b/apps/telegram-bot/core/handlers/url_process.py
@@ -7,7 +7,6 @@
CallbackContext,
)
-from core import api_client
from core.services.message_sender import send_item_message
from core.services.user_settings import get_auto_fetch_in_dm
from fastfetchbot_shared.utils.config import SOCIAL_MEDIA_WEBSITE_PATTERNS, VIDEO_WEBSITE_PATTERNS
@@ -20,9 +19,66 @@
FILE_EXPORTER_ON,
OPENAI_API_KEY,
GENERAL_SCRAPING_ON,
+ SCRAPE_MODE,
)
+async def _get_url_metadata(url: str, ban_list: list | None = None) -> dict:
+ """Resolve URL metadata via API or shared library depending on SCRAPE_MODE.
+
+ In API mode: calls the API server's /scraper/getUrlMetadata endpoint.
+ In queue mode: calls the shared library's get_url_metadata directly
+ (pure URL parsing, no network call needed).
+ """
+ if SCRAPE_MODE == "queue":
+ from fastfetchbot_shared.utils.parse import get_url_metadata as shared_get_url_metadata
+
+ url_metadata = await shared_get_url_metadata(url, ban_list=ban_list)
+ return url_metadata.to_dict()
+ else:
+ from core import api_client
+
+ return await api_client.get_url_metadata(url, ban_list=ban_list)
+
+
+async def _fetch_and_send(
+ url: str,
+ chat_id: int | str,
+ message_id: int | None = None,
+ source: str = "",
+ content_type: str = "",
+ message=None,
+ **kwargs,
+) -> None:
+ """Fetch an item via API or queue depending on SCRAPE_MODE.
+
+ Args:
+ url: The resolved URL to scrape.
+ chat_id: Target chat for the result.
+ message_id: Optional message ID for reply threading.
+ source: Pre-resolved source platform (e.g. "twitter").
+ content_type: Pre-resolved content type (e.g. "social_media").
+ message: Optional telegram Message for reply context.
+ **kwargs: Extra arguments passed to the scraper.
+ """
+ if SCRAPE_MODE == "queue":
+ from core import queue_client
+
+ await queue_client.enqueue_scrape(
+ url=url,
+ chat_id=chat_id,
+ message_id=message_id,
+ source=source,
+ content_type=content_type,
+ **kwargs,
+ )
+ else:
+ from core import api_client
+
+ metadata_item = await api_client.get_item(url=url, **kwargs)
+ await send_item_message(metadata_item, chat_id=chat_id, message=message)
+
+
async def https_url_process(update: Update, context: CallbackContext) -> None:
message = update.message
@@ -41,7 +97,7 @@ async def https_url_process(update: Update, context: CallbackContext) -> None:
process_message = await message.reply_text(
text=f"Processing the {i + 1}th url...",
)
- url_metadata = await api_client.get_url_metadata(url, ban_list=TELEGRAM_BOT_MESSAGE_BAN_LIST)
+ url_metadata = await _get_url_metadata(url, ban_list=TELEGRAM_BOT_MESSAGE_BAN_LIST)
if url_metadata["source"] == "banned":
await process_message.edit_text(
text=f"For the {i + 1} th url, the url is banned."
@@ -52,9 +108,11 @@ async def https_url_process(update: Update, context: CallbackContext) -> None:
await process_message.edit_text(
text=f"Uncategorized url found. General webpage parser is on, Processing..."
)
- metadata_item = await api_client.get_item(url=url_metadata["url"])
- await send_item_message(
- metadata_item, chat_id=message.chat_id
+ await _fetch_and_send(
+ url=url_metadata["url"],
+ chat_id=message.chat_id,
+ source=url_metadata.get("source", ""),
+ content_type=url_metadata.get("content_type", ""),
)
await process_message.edit_text(
text=f"For the {i + 1} th url, no supported url found."
@@ -210,26 +268,32 @@ async def _auto_fetch_urls(message) -> None:
"""Auto-fetch all URLs in a DM message without showing action buttons."""
url_dict = message.parse_entities(types=["url"])
for i, url in enumerate(url_dict.values()):
- url_metadata = await api_client.get_url_metadata(
+ url_metadata = await _get_url_metadata(
url, ban_list=TELEGRAM_BOT_MESSAGE_BAN_LIST
)
if url_metadata["source"] == "unknown" and GENERAL_SCRAPING_ON:
- metadata_item = await api_client.get_item(url=url_metadata["url"])
- await send_item_message(
- metadata_item, chat_id=message.chat_id
+ await _fetch_and_send(
+ url=url_metadata["url"],
+ chat_id=message.chat_id,
+ source=url_metadata.get("source", ""),
+ content_type=url_metadata.get("content_type", ""),
)
elif url_metadata["source"] == "unknown" or url_metadata["source"] == "banned":
logger.debug(f"for the {i + 1}th url {url}, no supported url found.")
continue
if url_metadata.get("source") in SOCIAL_MEDIA_WEBSITE_PATTERNS.keys():
- metadata_item = await api_client.get_item(url=url_metadata["url"])
- await send_item_message(
- metadata_item, chat_id=message.chat_id
+ await _fetch_and_send(
+ url=url_metadata["url"],
+ chat_id=message.chat_id,
+ source=url_metadata.get("source", ""),
+ content_type=url_metadata.get("content_type", ""),
)
if url_metadata.get("source") in VIDEO_WEBSITE_PATTERNS.keys():
- metadata_item = await api_client.get_item(url=url_metadata["url"])
- await send_item_message(
- metadata_item, chat_id=message.chat_id
+ await _fetch_and_send(
+ url=url_metadata["url"],
+ chat_id=message.chat_id,
+ source=url_metadata.get("source", ""),
+ content_type=url_metadata.get("content_type", ""),
)
@@ -237,24 +301,33 @@ async def https_url_auto_process(update: Update, context: CallbackContext) -> No
message = update.message
url_dict = message.parse_entities(types=["url"])
for i, url in enumerate(url_dict.values()):
- url_metadata = await api_client.get_url_metadata(
+ url_metadata = await _get_url_metadata(
url, ban_list=TELEGRAM_GROUP_MESSAGE_BAN_LIST
)
if url_metadata["source"] == "unknown" and GENERAL_SCRAPING_ON:
- metadata_item = await api_client.get_item(url=url_metadata["url"])
- await send_item_message(
- metadata_item, chat_id=message.chat_id, message=message
+ await _fetch_and_send(
+ url=url_metadata["url"],
+ chat_id=message.chat_id,
+ message=message,
+ source=url_metadata.get("source", ""),
+ content_type=url_metadata.get("content_type", ""),
)
elif url_metadata["source"] == "unknown" or url_metadata["source"] == "banned":
logger.debug(f"for the {i + 1}th url {url}, no supported url found.")
return
if url_metadata.get("source") in SOCIAL_MEDIA_WEBSITE_PATTERNS.keys():
- metadata_item = await api_client.get_item(url=url_metadata["url"])
- await send_item_message(
- metadata_item, chat_id=message.chat_id, message=message
+ await _fetch_and_send(
+ url=url_metadata["url"],
+ chat_id=message.chat_id,
+ message=message,
+ source=url_metadata.get("source", ""),
+ content_type=url_metadata.get("content_type", ""),
)
if url_metadata.get("source") in VIDEO_WEBSITE_PATTERNS.keys():
- metadata_item = await api_client.get_item(url=url_metadata["url"])
- await send_item_message(
- metadata_item, chat_id=message.chat_id, message=message
+ await _fetch_and_send(
+ url=url_metadata["url"],
+ chat_id=message.chat_id,
+ message=message,
+ source=url_metadata.get("source", ""),
+ content_type=url_metadata.get("content_type", ""),
)
diff --git a/apps/telegram-bot/core/queue_client.py b/apps/telegram-bot/core/queue_client.py
new file mode 100644
index 0000000..ee15209
--- /dev/null
+++ b/apps/telegram-bot/core/queue_client.py
@@ -0,0 +1,68 @@
+import uuid
+
+from arq.connections import ArqRedis, create_pool, RedisSettings
+
+from core.config import ARQ_REDIS_URL
+from fastfetchbot_shared.utils.logger import logger
+
+_arq_redis: ArqRedis | None = None
+
+
+def _parse_redis_url(url: str) -> RedisSettings:
+ """Parse a redis:// URL into ARQ RedisSettings."""
+ from urllib.parse import urlparse
+
+ parsed = urlparse(url)
+ return RedisSettings(
+ host=parsed.hostname or "localhost",
+ port=parsed.port or 6379,
+ database=int(parsed.path.lstrip("/") or 0),
+ password=parsed.password,
+ )
+
+
+async def init() -> None:
+ """Initialize the ARQ Redis connection pool."""
+ global _arq_redis
+ if _arq_redis is None:
+ _arq_redis = await create_pool(_parse_redis_url(ARQ_REDIS_URL))
+ logger.info("ARQ queue client initialized")
+
+
+async def close() -> None:
+ """Close the ARQ Redis connection pool."""
+ global _arq_redis
+ if _arq_redis is not None:
+ await _arq_redis.aclose()
+ _arq_redis = None
+ logger.info("ARQ queue client closed")
+
+
+async def enqueue_scrape(
+ url: str,
+ chat_id: int | str,
+ message_id: int | None = None,
+ source: str = "",
+ content_type: str = "",
+ **kwargs,
+) -> str:
+ """Enqueue a scrape-and-enrich job to the ARQ worker.
+
+ Returns the job_id (UUID string).
+ """
+ if _arq_redis is None:
+ raise RuntimeError("Queue client not initialized. Call queue_client.init() first.")
+
+ job_id = str(uuid.uuid4())
+ await _arq_redis.enqueue_job(
+ "scrape_and_enrich",
+ url=url,
+ chat_id=chat_id,
+ job_id=job_id,
+ message_id=message_id,
+ source=source,
+ content_type=content_type,
+ **kwargs,
+ )
+ logger.info(f"Enqueued scrape job: job_id={job_id}, url={url}")
+ return job_id
diff --git a/apps/telegram-bot/core/services/bot_app.py b/apps/telegram-bot/core/services/bot_app.py
index 5ef9624..a2be308 100644
--- a/apps/telegram-bot/core/services/bot_app.py
+++ b/apps/telegram-bot/core/services/bot_app.py
@@ -30,6 +30,7 @@
TELEBOT_READ_TIMEOUT,
TELEBOT_WRITE_TIMEOUT,
TELEBOT_MAX_RETRY,
+ SCRAPE_MODE,
)
from core.handlers.url_process import https_url_process, https_url_auto_process
@@ -130,6 +131,15 @@ async def startup() -> None:
BotCommand("settings", "Customize bot behavior"),
]
)
+ # Initialize queue mode if enabled
+ if SCRAPE_MODE == "queue":
+ from core import queue_client
+ from core.services import outbox_consumer
+
+ await queue_client.init()
+ await outbox_consumer.start()
+ logger.info("Queue mode enabled: ARQ client and outbox consumer started")
+
if application.post_init:
await application.post_init()
await application.start()
@@ -178,6 +188,15 @@ async def show_bot_info() -> None:
async def shutdown() -> None:
+ # Shut down queue mode resources
+ if SCRAPE_MODE == "queue":
+ from core import queue_client
+ from core.services import outbox_consumer
+
+ await outbox_consumer.stop()
+ await queue_client.close()
+ logger.info("Queue mode resources shut down")
+
if application.updater and application.updater.running:
await application.updater.stop()
await application.stop()
diff --git a/apps/telegram-bot/core/services/outbox_consumer.py b/apps/telegram-bot/core/services/outbox_consumer.py
new file mode 100644
index 0000000..ec203ca
--- /dev/null
+++ b/apps/telegram-bot/core/services/outbox_consumer.py
@@ -0,0 +1,99 @@
+import asyncio
+import json
+
+import redis.asyncio as aioredis
+
+from core.config import OUTBOX_REDIS_URL, OUTBOX_QUEUE_KEY
+from core.services.message_sender import send_item_message
+from fastfetchbot_shared.utils.logger import logger
+
+_redis: aioredis.Redis | None = None
+_consumer_task: asyncio.Task | None = None
+
+
+async def _get_redis() -> aioredis.Redis:
+ """Get or create the outbox Redis connection."""
+ global _redis
+ if _redis is None:
+ _redis = aioredis.from_url(OUTBOX_REDIS_URL, decode_responses=True)
+ return _redis
+
+
+async def _consume_loop() -> None:
+ """Background loop: BRPOP from the outbox queue and dispatch results."""
+ r = await _get_redis()
+ logger.info(f"Outbox consumer started, listening on '{OUTBOX_QUEUE_KEY}'")
+
+ while True:
+ try:
+ # BRPOP blocks until a message is available (timeout=0 means block forever)
+ result = await r.brpop(OUTBOX_QUEUE_KEY, timeout=0)
+ if result is None:
+ continue
+
+ _, raw_payload = result
+ payload = json.loads(raw_payload)
+
+ job_id = payload.get("job_id", "unknown")
+ chat_id = payload.get("chat_id")
+ error = payload.get("error")
+
+ if error:
+ logger.warning(f"[{job_id}] Scrape failed: {error}")
+ await _send_error_to_chat(chat_id, error)
+ else:
+ metadata_item = payload.get("metadata_item")
+ if metadata_item and chat_id:
+ logger.info(f"[{job_id}] Delivering result to chat {chat_id}")
+ await send_item_message(metadata_item, chat_id=chat_id)
+ else:
+ logger.warning(f"[{job_id}] Invalid payload: missing metadata_item or chat_id")
+
+ except asyncio.CancelledError:
+ logger.info("Outbox consumer cancelled, shutting down")
+ break
+ except Exception as e:
+ logger.error(f"Outbox consumer error: {e}")
+ # Brief pause before retrying to avoid tight error loops
+ await asyncio.sleep(1)
+
+
+async def _send_error_to_chat(chat_id: int | str, error: str) -> None:
+ """Send an error notification to the user's chat."""
+ try:
+ from core.services.bot_app import application
+
+ await application.bot.send_message(
+ chat_id=chat_id,
+ text=f"Sorry, an error occurred while processing your request:\n\n{error}",
+ )
+ except Exception as e:
+ logger.error(f"Failed to send error message to chat {chat_id}: {e}")
+
+
+async def start() -> None:
+ """Start the outbox consumer as a background asyncio task."""
+ global _consumer_task
+ if _consumer_task is not None:
+ logger.warning("Outbox consumer already running")
+ return
+ _consumer_task = asyncio.create_task(_consume_loop())
+ logger.info("Outbox consumer task created")
+
+
+async def stop() -> None:
+ """Stop the outbox consumer and close the Redis connection."""
+ global _consumer_task, _redis
+
+ if _consumer_task is not None:
+ _consumer_task.cancel()
+ try:
+ await _consumer_task
+ except asyncio.CancelledError:
+ pass
+ _consumer_task = None
+ logger.info("Outbox consumer task stopped")
+
+ if _redis is not None:
+ await _redis.aclose()
+ _redis = None
diff --git a/apps/telegram-bot/pyproject.toml b/apps/telegram-bot/pyproject.toml
index e4efff5..5f1c162 100644
--- a/apps/telegram-bot/pyproject.toml
+++ b/apps/telegram-bot/pyproject.toml
@@ -11,6 +11,8 @@ dependencies = [
"jinja2>=3.1.6",
"beanie>=1.29.0",
"aiofiles>=24.1.0",
+ "arq>=0.26.1",
+ "redis[hiredis]>=5.0.0",
]
[build-system]
diff --git a/apps/worker/celerybeat-schedule.db b/apps/worker/celerybeat-schedule.db
index e69de29..ed42f6f 100644
Binary files a/apps/worker/celerybeat-schedule.db and b/apps/worker/celerybeat-schedule.db differ
diff --git a/docker-compose.template.yml b/docker-compose.template.yml
index 05837c2..fd79123 100644
--- a/docker-compose.template.yml
+++ b/docker-compose.template.yml
@@ -33,11 +33,15 @@ services:
- .env
environment:
- API_SERVER_URL=http://api:10450
+ - SCRAPE_MODE=queue
+ - ARQ_REDIS_URL=redis://redis:6379/2
+ - OUTBOX_REDIS_URL=redis://redis:6379/3
ports:
- 10451:10451
depends_on:
- api
- telegram-bot-api
+ - redis
# - postgres # Uncomment when using PostgreSQL for user settings
telegram-bot-api:
@@ -82,6 +86,25 @@ services:
ports:
- 6379:6379
+ async-worker:
+ image: ghcr.io/aturret/fastfetchbot-async-worker:latest
+ # build:
+ # context: .
+ # dockerfile: apps/async-worker/Dockerfile
+ container_name: fastfetchbot-async-worker
+ restart: always
+ env_file:
+ - .env
+ environment:
+ - ARQ_REDIS_URL=redis://redis:6379/2
+ - OUTBOX_REDIS_URL=redis://redis:6379/3
+ - CELERY_BROKER_URL=redis://redis:6379/0
+ - CELERY_RESULT_BACKEND=redis://redis:6379/1
+ volumes:
+ - ./conf:/app/conf
+ depends_on:
+ - redis
+
worker:
image: ghcr.io/aturret/fastfetchbot-worker:latest
# build:
diff --git a/packages/shared/fastfetchbot_shared/services/file_export/__init__.py b/packages/shared/fastfetchbot_shared/services/file_export/__init__.py
new file mode 100644
index 0000000..cc47838
--- /dev/null
+++ b/packages/shared/fastfetchbot_shared/services/file_export/__init__.py
@@ -0,0 +1,6 @@
+"""Async file export services: PDF export, video download, audio transcription.
+
+These are async Celery task wrappers that accept a celery_app and timeout
+as constructor parameters — no app-specific config imports. Each app
+(API server, async worker) injects its own Celery client and timeout.
+"""
diff --git a/packages/shared/fastfetchbot_shared/services/file_export/audio_transcribe.py b/packages/shared/fastfetchbot_shared/services/file_export/audio_transcribe.py
new file mode 100644
index 0000000..f86f3b5
--- /dev/null
+++ b/packages/shared/fastfetchbot_shared/services/file_export/audio_transcribe.py
@@ -0,0 +1,44 @@
+"""Async audio transcription via Celery task submission.
+
+This module wraps the synchronous transcribe logic with an async interface
+that submits work to a Celery worker and awaits the result. The Celery app
+and timeout are injected — no app-specific config imports.
+"""
+
+import asyncio
+
+from fastfetchbot_shared.utils.logger import logger
+
+
+class AudioTranscribe:
+ """Async audio transcription that submits a Celery task and awaits the result.
+
+ Args:
+ audio_file: Path to the audio file to transcribe.
+ celery_app: A Celery application instance for task submission.
+ timeout: Timeout in seconds for the Celery task. Default: 600.
+ """
+
+ def __init__(self, audio_file: str, celery_app, timeout: int = 600):
+ self.audio_file = audio_file
+ self.celery_app = celery_app
+ self.timeout = timeout
+
+ async def transcribe(self) -> str:
+ """Submit transcription task to Celery and return the transcript text."""
+ logger.info(f"Submitting transcribe task: {self.audio_file}")
+ result = self.celery_app.send_task(
+ "file_export.transcribe",
+ kwargs={"audio_file": self.audio_file},
+ )
+ try:
+ response = await asyncio.to_thread(
+ result.get, timeout=int(self.timeout)
+ )
+ return response["transcript"]
+ except Exception:
+ logger.exception(
+ f"file_export.transcribe task failed: "
+ f"audio_file={self.audio_file}, timeout={self.timeout}"
+ )
+ raise
diff --git a/packages/shared/fastfetchbot_shared/services/file_export/pdf_export.py b/packages/shared/fastfetchbot_shared/services/file_export/pdf_export.py
new file mode 100644
index 0000000..0419b11
--- /dev/null
+++ b/packages/shared/fastfetchbot_shared/services/file_export/pdf_export.py
@@ -0,0 +1,79 @@
+"""Async PDF export via Celery task submission.
+
+This module wraps the synchronous pdf_export logic with an async interface
+that submits work to a Celery worker and awaits the result. The Celery app
+and timeout are injected — no app-specific config imports.
+"""
+
+import asyncio
+import uuid
+
+from bs4 import BeautifulSoup
+from fastfetchbot_shared.utils.logger import logger
+
+
+class PdfExport:
+ """Async PDF export that submits a Celery task and awaits the result.
+
+ Args:
+ title: Document title (used for output filename).
+ html_string: HTML content to convert to PDF.
+ celery_app: A Celery application instance for task submission.
+ timeout: Timeout in seconds for the Celery task. Default: 600.
+ """
+
+ def __init__(
+ self,
+ title: str,
+ html_string: str,
+ celery_app,
+ timeout: int = 600,
+ ):
+ self.title = title
+ self.html_string = html_string
+ self.celery_app = celery_app
+ self.timeout = timeout
+
+ async def export(self) -> str:
+ """Submit PDF export task to Celery and return the output filename."""
+ html_string = wrap_html_string(self.html_string)
+ output_filename = f"{self.title}-{uuid.uuid4()}.pdf"
+
+ logger.info(f"Submitting pdf export task: {output_filename}")
+ result = self.celery_app.send_task(
+ "file_export.pdf_export",
+ kwargs={
+ "html_string": html_string,
+ "output_filename": output_filename,
+ },
+ )
+ try:
+ response = await asyncio.to_thread(
+ result.get, timeout=int(self.timeout)
+ )
+ output_filename = response["output_filename"]
+ except Exception:
+ logger.exception(
+ f"file_export.pdf_export task failed: "
+ f"output_filename={output_filename}, timeout={self.timeout}"
+ )
+ raise
+
+ logger.info(f"PDF export success: {output_filename}")
+ return output_filename
+
+
+def wrap_html_string(html_string: str) -> str:
+ """Wrap raw HTML content in a proper document structure and strip inline styles."""
+ soup = BeautifulSoup(
+ ''
+ '',
+ "html.parser",
+ )
+ soup.body.append(BeautifulSoup(html_string, "html.parser"))
+ for tag in soup.find_all(True):
+ if "style" in tag.attrs:
+ del tag["style"]
+ for style_tag in soup.find_all("style"):
+ style_tag.decompose()
+ return soup.prettify()
diff --git a/packages/shared/fastfetchbot_shared/services/file_export/video_download.py b/packages/shared/fastfetchbot_shared/services/file_export/video_download.py
new file mode 100644
index 0000000..5be1735
--- /dev/null
+++ b/packages/shared/fastfetchbot_shared/services/file_export/video_download.py
@@ -0,0 +1,270 @@
+"""Async video download/info extraction via Celery task submission.
+
+This module wraps the synchronous video_download logic with an async interface
+that submits work to a Celery worker and awaits the result. The Celery app
+and timeout are injected — no app-specific config imports.
+"""
+
+import asyncio
+from urllib.parse import urlparse, parse_qs
+
+import httpx
+
+from fastfetchbot_shared.models.metadata_item import MetadataItem, MessageType, MediaFile
+from fastfetchbot_shared.utils.parse import unix_timestamp_to_utc, second_to_time, wrap_text_into_html
+from fastfetchbot_shared.utils.logger import logger
+from fastfetchbot_shared.services.scrapers.config import JINJA2_ENV
+
+video_info_template = JINJA2_ENV.get_template("video_info.jinja2")
+
+
+class VideoDownloader(MetadataItem):
+ """Async video downloader that submits Celery tasks for yt-dlp operations.
+
+ Args:
+ url: Video URL.
+ category: Platform extractor name (e.g. "youtube", "bilibili").
+ celery_app: A Celery application instance for task submission.
+ timeout: Timeout in seconds for the Celery task. Default: 600.
+ data: Optional data dict.
+ download: Whether to download the video file. Default: True.
+ audio_only: Whether to extract audio only. Default: False.
+ hd: Whether to download in HD. Default: False.
+ transcribe: Whether to transcribe the audio. Default: False.
+ """
+
+ def __init__(
+ self,
+ url: str,
+ category: str,
+ celery_app,
+ timeout: int = 600,
+ data=None,
+ download: bool = True,
+ audio_only: bool = False,
+ hd: bool = False,
+ transcribe: bool = False,
+ **kwargs,
+ ):
+ self.extractor = category
+ self.url = url
+ self.author_url = ""
+ self.download = download
+ self.audio_only = audio_only
+ self.transcribe = transcribe
+ self.hd = hd
+ self.message_type = MessageType.SHORT
+ self.file_path = None
+ self.category = category
+ self.media_files = []
+ self.created = None
+ self.duration = None
+ self.celery_app = celery_app
+ self.timeout = timeout
+
+ async def get_item(self) -> dict:
+ self.url = await self._parse_url(self.url)
+ await self.get_video()
+ return self.to_dict()
+
+ async def get_video(self) -> None:
+ content_info = await self.get_video_info()
+ self.file_path = content_info["file_path"]
+ video_info_funcs = {
+ "youtube": self._youtube_info_parse,
+ "bilibili": self._bilibili_info_parse,
+ }
+ meta_info = video_info_funcs[self.extractor](content_info)
+ self._video_info_formatting(meta_info)
+ # AI transcribe
+ if self.transcribe:
+ from fastfetchbot_shared.services.file_export.audio_transcribe import AudioTranscribe
+
+ audio_content_info = await self.get_video_info(audio_only=True)
+ audio_file_path = audio_content_info["file_path"]
+ audio_transcribe = AudioTranscribe(
+ audio_file_path,
+ celery_app=self.celery_app,
+ timeout=self.timeout,
+ )
+ transcribe_text = await audio_transcribe.transcribe()
+ if self.download is False:
+ self.message_type = MessageType.LONG
+ self.text += "\nAI\u5168\u6587\u6458\u5f55\uff1a" + transcribe_text
+ self.content += "
" + wrap_text_into_html(transcribe_text)
+
+ async def _parse_url(self, url: str) -> str:
+ async def _get_redirected_url(original_url: str) -> str:
+ async with httpx.AsyncClient(follow_redirects=False) as client:
+ resp = await client.get(original_url)
+ if resp.status_code == 200:
+ original_url = resp.url
+ elif resp.status_code == 302:
+ original_url = resp.headers["Location"]
+ return original_url
+
+ def _remove_youtube_link_tracing(original_url: str) -> str:
+ original_url_parser = urlparse(original_url)
+ original_url_hostname = str(original_url_parser.hostname)
+ if "youtu.be" in original_url_hostname:
+ original_url = original_url.split("?")[0]
+ if "youtube.com" in original_url_hostname:
+ original_url = (
+ original_url_parser.scheme
+ + "://"
+ + original_url_parser.netloc
+ + original_url_parser.path
+ )
+ if original_url_parser.query:
+ v_part_query = [
+ item
+ for item in original_url_parser.query.split("&")
+ if "v=" in item
+ ]
+ if v_part_query:
+ original_url += "?" + v_part_query[0]
+ return original_url
+
+ def _remove_bilibili_link_tracing(original_url: str) -> str:
+ original_url_parser = urlparse(original_url)
+ original_url_hostname = str(original_url_parser.hostname)
+ query_dict = parse_qs(original_url_parser.query)
+ bilibili_p_query_string = (
+ "?p=" + query_dict["p"][0] if "p" in query_dict else ""
+ )
+ if "bilibili.com" in original_url_hostname:
+ original_url = (
+ original_url_parser.scheme
+ + "://"
+ + original_url_parser.netloc
+ + original_url_parser.path
+ )
+ return original_url + bilibili_p_query_string
+
+ logger.info(f"Parsing original video url: {url} for {self.extractor}")
+
+ url_parser = urlparse(url)
+ url_hostname = str(url_parser.hostname)
+
+ if self.extractor == "bilibili":
+ if "b23.tv" in url_hostname:
+ url = await _get_redirected_url(url)
+ if "m.bilibili.com" in url_hostname:
+ url = url.replace("m.bilibili.com", "www.bilibili.com")
+ url = _remove_bilibili_link_tracing(url)
+ elif self.extractor == "youtube":
+ if "youtu.be" in url_hostname:
+ url = await _get_redirected_url(url)
+ url = _remove_youtube_link_tracing(url)
+
+ logger.info(f"Parsed video url: {url} for {self.extractor}")
+ return url
+
+ async def get_video_info(
+ self,
+ url: str = None,
+ download: bool = None,
+ extractor: str = None,
+ audio_only: bool = None,
+ hd: bool = None,
+ ) -> dict:
+ """Submit a Celery task to download/extract video info."""
+ if url is None:
+ url = self.url
+ if download is None:
+ download = self.download
+ if extractor is None:
+ extractor = self.extractor
+ if audio_only is None:
+ audio_only = self.audio_only
+ if hd is None:
+ hd = self.hd
+
+ body = {
+ "url": url,
+ "download": download,
+ "extractor": extractor,
+ "audio_only": audio_only,
+ "hd": hd,
+ }
+ logger.info(f"Submitting video download task: {body}")
+ if download is True:
+ logger.info("Video downloading... it may take a while")
+ if hd is True:
+ logger.info("Downloading HD video, it may take longer")
+ elif audio_only is True:
+ logger.info("Downloading audio only")
+
+ result = self.celery_app.send_task(
+ "file_export.video_download", kwargs=body
+ )
+ try:
+ response = await asyncio.to_thread(
+ result.get, timeout=int(self.timeout)
+ )
+ content_info = response["content_info"]
+ content_info["file_path"] = response["file_path"]
+ return content_info
+ except Exception:
+ logger.exception(
+ f"file_export.video_download task failed: "
+ f"url={url}, extractor={extractor}, timeout={self.timeout}"
+ )
+ raise
+
+ def _video_info_formatting(self, meta_info: dict):
+ self.title = meta_info["title"]
+ self.author = meta_info["author"]
+ self.author_url = meta_info["author_url"]
+ if len(meta_info["description"]) > 800:
+ meta_info["description"] = meta_info["description"][:800] + "..."
+ self.created = meta_info["upload_date"]
+ self.duration = meta_info["duration"]
+ self.text = video_info_template.render(
+ data={
+ "url": self.url,
+ "title": self.title,
+ "author": self.author,
+ "author_url": self.author_url,
+ "duration": self.duration,
+ "created": self.created,
+ "playback_data": meta_info["playback_data"],
+ "description": meta_info["description"],
+ }
+ )
+ self.content = self.text.replace("\n", "
")
+ if self.download:
+ media_type = "video"
+ if self.audio_only:
+ media_type = "audio"
+ self.media_files = [MediaFile(media_type, self.file_path, "")]
+
+ @staticmethod
+ def _youtube_info_parse(video_info: dict) -> dict:
+ return {
+ "id": video_info["id"],
+ "title": video_info["title"],
+ "author": video_info["uploader"],
+ "author_url": video_info["uploader_url"] or video_info["channel_url"],
+ "description": video_info["description"],
+ "playback_data": f"\u89c6\u9891\u64ad\u653e\u91cf\uff1a{video_info['view_count']} \u8bc4\u8bba\u6570\uff1a{video_info['comment_count']}",
+ "author_avatar": video_info["thumbnail"],
+ "upload_date": str(video_info["upload_date"]),
+ "duration": second_to_time(round(video_info["duration"])),
+ }
+
+ @staticmethod
+ def _bilibili_info_parse(video_info: dict) -> dict:
+ return {
+ "id": video_info["id"],
+ "title": video_info["title"],
+ "author": video_info["uploader"],
+ "author_url": "https://space.bilibili.com/"
+ + str(video_info["uploader_id"]),
+ "author_avatar": video_info["thumbnail"],
+ "ext": video_info["ext"],
+ "description": video_info["description"],
+ "playback_data": f"\u89c6\u9891\u64ad\u653e\u91cf\uff1a{video_info['view_count']} \u5f39\u5e55\u6570\uff1a{video_info['comment_count']} \u70b9\u8d5e\u6570\uff1a{video_info['like_count']}",
+ "upload_date": unix_timestamp_to_utc(video_info["timestamp"]),
+ "duration": second_to_time(round(video_info["duration"])),
+ }
diff --git a/packages/shared/fastfetchbot_shared/services/scrapers/common.py b/packages/shared/fastfetchbot_shared/services/scrapers/common.py
index d87e078..0e94809 100644
--- a/packages/shared/fastfetchbot_shared/services/scrapers/common.py
+++ b/packages/shared/fastfetchbot_shared/services/scrapers/common.py
@@ -19,10 +19,21 @@
class InfoExtractService(object):
"""Core scraping service — routes URLs to the correct scraper and returns raw metadata.
- This base class handles only scraping. Telegraph publishing, PDF export,
- DB storage, and video download are handled by subclasses (e.g. in the API app).
+ This base class handles scraping for all platforms, including video sites
+ (YouTube, Bilibili) via the shared VideoDownloader. Telegraph publishing,
+ PDF export, and DB storage are handled by subclasses (e.g. in the API app).
+
+ For video platforms, callers must pass ``celery_app`` and optionally
+ ``timeout`` as keyword arguments so the VideoDownloader can submit Celery
+ tasks for yt-dlp operations.
"""
+ @staticmethod
+ def _get_video_downloader():
+ """Lazy import to avoid circular dependency (video_download → scrapers.config → scrapers → common)."""
+ from fastfetchbot_shared.services.file_export.video_download import VideoDownloader
+ return VideoDownloader
+
service_classes: dict = {
"twitter": twitter.Twitter,
"threads": threads.Threads,
@@ -58,6 +69,14 @@ def __init__(
def category(self) -> str:
return self.source
+ def _resolve_scraper_class(self, category: str):
+ """Look up scraper class, falling back to lazy VideoDownloader for video platforms."""
+ if category in self.service_classes:
+ return self.service_classes[category]
+ if category in ("youtube", "bilibili"):
+ return self._get_video_downloader()
+ raise KeyError(f"No scraper registered for category: {category}")
+
async def get_item(self, metadata_item: Optional[dict] = None) -> dict:
if not metadata_item:
try:
@@ -66,8 +85,9 @@ async def get_item(self, metadata_item: Optional[dict] = None) -> dict:
item_data_processor = await ScraperManager.scrapers[self.category].get_processor_by_url(url=self.url)
metadata_item = await item_data_processor.get_item()
else:
- scraper_item = self.service_classes[self.category](
- url=self.url, data=self.data, **self.kwargs
+ scraper_cls = self._resolve_scraper_class(self.category)
+ scraper_item = scraper_cls(
+ url=self.url, category=self.category, data=self.data, **self.kwargs
)
metadata_item = await scraper_item.get_item()
except Exception as e:
diff --git a/packages/shared/fastfetchbot_shared/services/scrapers/weibo/scraper.py b/packages/shared/fastfetchbot_shared/services/scrapers/weibo/scraper.py
index 71215e2..a0dd89e 100644
--- a/packages/shared/fastfetchbot_shared/services/scrapers/weibo/scraper.py
+++ b/packages/shared/fastfetchbot_shared/services/scrapers/weibo/scraper.py
@@ -217,14 +217,14 @@ async def _process_weibo_item(self, weibo_info: dict) -> None:
weibo_item_data["content"] = content
# resolve retweet
if weibo_info.get("retweeted_status"):
- retweeted_weibo_id = (
+ retweeted_weibo_id = str((
weibo_info["retweeted_status"].get("id")
or weibo_info["retweeted_status"].get("mid")
or weibo_info["retweeted_status"].get("idstr")
- )
+ ))
retweeted_weibo_item = WeiboDataProcessor(url=WEIBO_WEB_HOST + retweeted_weibo_id)
retweeted_info = await retweeted_weibo_item.get_item()
- weibo_item_data["text"] += retweeted_info["text"]
+ weibo_item_data["text"] += "\n" + retweeted_info["text"] + "
"
weibo_item_data["content"] += "
" + retweeted_info["content"]
weibo_item_data["media_files"] += retweeted_info["media_files"]
# type check
diff --git a/pyproject.toml b/pyproject.toml
index 3100a79..b0c1f87 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -44,6 +44,7 @@ dependencies = [
"celery[redis]>=5.4.0,<6.0.0",
"fastfetchbot-shared[postgres]",
"fastfetchbot-file-export",
+ "arq>=0.27.0",
]
[project.optional-dependencies]
diff --git a/template.env b/template.env
index 84ac84c..4d95693 100644
--- a/template.env
+++ b/template.env
@@ -164,3 +164,13 @@ CELERY_BROKER_URL=redis://redis:6379/0
# Redis URL for Celery result backend. Default: `redis://localhost:6379/1`
CELERY_RESULT_BACKEND=redis://redis:6379/1
+
+# Async Scraping Worker (ARQ)
+# Scrape mode: "api" (sync via API server) or "queue" (async via ARQ worker). Default: `api`
+SCRAPE_MODE=api
+
+# Redis URL for ARQ task queue. Default: `redis://localhost:6379/2`
+ARQ_REDIS_URL=redis://redis:6379/2
+
+# Redis URL for the result outbox. Default: `redis://localhost:6379/3`
+OUTBOX_REDIS_URL=redis://redis:6379/3
diff --git a/tests/unit/async_worker/__init__.py b/tests/unit/async_worker/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/unit/async_worker/conftest.py b/tests/unit/async_worker/conftest.py
new file mode 100644
index 0000000..b8206ce
--- /dev/null
+++ b/tests/unit/async_worker/conftest.py
@@ -0,0 +1,7 @@
+import sys
+from pathlib import Path
+
+# Add the async-worker app directory to sys.path so 'async_worker' is importable
+_app_dir = Path(__file__).resolve().parents[3] / "apps" / "async-worker"
+if str(_app_dir) not in sys.path:
+ sys.path.insert(0, str(_app_dir))
diff --git a/tests/unit/async_worker/test_enrichment.py b/tests/unit/async_worker/test_enrichment.py
new file mode 100644
index 0000000..dc7caa9
--- /dev/null
+++ b/tests/unit/async_worker/test_enrichment.py
@@ -0,0 +1,197 @@
+"""Tests for apps/async-worker/async_worker/services/enrichment.py"""
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from async_worker.services.enrichment import enrich
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture
+def base_metadata_item():
+ """Minimal metadata item dict for enrichment."""
+ return {
+ "title": " Test Title ",
+ "content": "Test content
",
+ "telegraph_url": "",
+ "media_files": [],
+ "message_type": "short",
+ }
+
+
+@pytest.fixture
+def mock_telegraph():
+ """Patch Telegraph.from_dict and .get_telegraph."""
+ with patch("async_worker.services.enrichment.Telegraph") as MockTg:
+ instance = MagicMock()
+ instance.get_telegraph = AsyncMock(return_value="https://telegra.ph/test-01")
+ MockTg.from_dict.return_value = instance
+ yield MockTg, instance
+
+
+@pytest.fixture
+def mock_pdf_export():
+ """Patch PdfExport in the enrichment module."""
+ with patch(
+ "async_worker.services.enrichment.PdfExport", create=True
+ ) as MockPdf:
+ # The import is lazy, so we patch at the module level where it's imported
+ yield MockPdf
+
+
+# ---------------------------------------------------------------------------
+# Telegraph enrichment
+# ---------------------------------------------------------------------------
+
+
+class TestTelegraphEnrichment:
+ @pytest.mark.asyncio
+ async def test_publishes_to_telegraph_when_enabled(
+ self, base_metadata_item, mock_telegraph
+ ):
+ MockTg, instance = mock_telegraph
+ result = await enrich(base_metadata_item, store_telegraph=True)
+
+ MockTg.from_dict.assert_called_once_with(base_metadata_item)
+ instance.get_telegraph.assert_awaited_once()
+ assert result["telegraph_url"] == "https://telegra.ph/test-01"
+
+ @pytest.mark.asyncio
+ async def test_skips_telegraph_when_disabled(self, base_metadata_item, mock_telegraph):
+ MockTg, instance = mock_telegraph
+ result = await enrich(
+ base_metadata_item, store_telegraph=False, store_document=False
+ )
+
+ MockTg.from_dict.assert_not_called()
+ instance.get_telegraph.assert_not_awaited()
+
+ @pytest.mark.asyncio
+ async def test_telegraph_failure_sets_empty_url(
+ self, base_metadata_item, mock_telegraph
+ ):
+ _, instance = mock_telegraph
+ instance.get_telegraph = AsyncMock(side_effect=RuntimeError("telegraph down"))
+
+ result = await enrich(base_metadata_item, store_telegraph=True)
+ assert result["telegraph_url"] == ""
+
+ @pytest.mark.asyncio
+ async def test_long_message_forces_telegraph(self, base_metadata_item, mock_telegraph):
+ from fastfetchbot_shared.models.metadata_item import MessageType
+
+ base_metadata_item["message_type"] = MessageType.LONG
+ MockTg, instance = mock_telegraph
+
+ result = await enrich(
+ base_metadata_item, store_telegraph=False, store_document=False
+ )
+
+ # Should have been forced to True despite store_telegraph=False
+ MockTg.from_dict.assert_called_once()
+ instance.get_telegraph.assert_awaited_once()
+
+
+# ---------------------------------------------------------------------------
+# PDF enrichment
+# ---------------------------------------------------------------------------
+
+
+class TestPdfEnrichment:
+ @pytest.mark.asyncio
+ async def test_exports_pdf_when_store_document_true(self, base_metadata_item):
+ with patch(
+ "fastfetchbot_shared.services.file_export.pdf_export.PdfExport"
+ ) as MockPdf:
+ mock_instance = AsyncMock()
+ mock_instance.export = AsyncMock(return_value="/tmp/test.pdf")
+ MockPdf.return_value = mock_instance
+
+ result = await enrich(
+ base_metadata_item,
+ store_telegraph=False,
+ store_document=True,
+ )
+
+ assert any(
+ f["url"] == "/tmp/test.pdf" for f in result["media_files"]
+ )
+
+ @pytest.mark.asyncio
+ async def test_pdf_fallback_when_telegraph_fails(self, base_metadata_item, mock_telegraph):
+ _, instance = mock_telegraph
+ instance.get_telegraph = AsyncMock(side_effect=RuntimeError("fail"))
+
+ with patch(
+ "fastfetchbot_shared.services.file_export.pdf_export.PdfExport"
+ ) as MockPdf:
+ mock_pdf_instance = AsyncMock()
+ mock_pdf_instance.export = AsyncMock(return_value="/tmp/fallback.pdf")
+ MockPdf.return_value = mock_pdf_instance
+
+ result = await enrich(
+ base_metadata_item,
+ store_telegraph=True,
+ store_document=False,
+ )
+
+ # telegraph_url is "" so PDF should trigger as fallback
+ assert result["telegraph_url"] == ""
+ assert any(
+ f["url"] == "/tmp/fallback.pdf" for f in result["media_files"]
+ )
+
+ @pytest.mark.asyncio
+ async def test_pdf_failure_does_not_crash(self, base_metadata_item):
+ with patch(
+ "fastfetchbot_shared.services.file_export.pdf_export.PdfExport"
+ ) as MockPdf:
+ mock_instance = AsyncMock()
+ mock_instance.export = AsyncMock(side_effect=RuntimeError("pdf boom"))
+ MockPdf.return_value = mock_instance
+
+ # Should not raise
+ result = await enrich(
+ base_metadata_item,
+ store_telegraph=False,
+ store_document=True,
+ )
+ assert result["media_files"] == []
+
+
+# ---------------------------------------------------------------------------
+# Title stripping
+# ---------------------------------------------------------------------------
+
+
+class TestTitleStripping:
+ @pytest.mark.asyncio
+ async def test_strips_title_whitespace(self, base_metadata_item):
+ base_metadata_item["title"] = " padded title "
+ result = await enrich(
+ base_metadata_item, store_telegraph=False, store_document=False
+ )
+ assert result["title"] == "padded title"
+
+
+# ---------------------------------------------------------------------------
+# Config defaults
+# ---------------------------------------------------------------------------
+
+
+class TestConfigDefaults:
+ @pytest.mark.asyncio
+ async def test_uses_config_defaults_when_none(self, base_metadata_item, mock_telegraph):
+ """When store_telegraph/store_document are None, config defaults should be used."""
+ with patch("async_worker.services.enrichment.STORE_TELEGRAPH", True), \
+ patch("async_worker.services.enrichment.STORE_DOCUMENT", False):
+ result = await enrich(base_metadata_item)
+
+ # STORE_TELEGRAPH=True means Telegraph should be called
+ MockTg, instance = mock_telegraph
+ MockTg.from_dict.assert_called_once()
diff --git a/tests/unit/async_worker/test_main.py b/tests/unit/async_worker/test_main.py
new file mode 100644
index 0000000..be97103
--- /dev/null
+++ b/tests/unit/async_worker/test_main.py
@@ -0,0 +1,69 @@
+"""Tests for apps/async-worker/async_worker/main.py — parse_redis_url and WorkerSettings."""
+
+import pytest
+
+from async_worker.main import parse_redis_url
+
+
+# ---------------------------------------------------------------------------
+# parse_redis_url
+# ---------------------------------------------------------------------------
+
+
+class TestParseRedisUrl:
+ def test_default_url(self):
+ settings = parse_redis_url("redis://localhost:6379/0")
+ assert settings.host == "localhost"
+ assert settings.port == 6379
+ assert settings.database == 0
+ assert settings.password is None
+
+ def test_custom_db(self):
+ settings = parse_redis_url("redis://localhost:6379/2")
+ assert settings.database == 2
+
+ def test_custom_host_and_port(self):
+ settings = parse_redis_url("redis://myhost:7000/5")
+ assert settings.host == "myhost"
+ assert settings.port == 7000
+ assert settings.database == 5
+
+ def test_with_password(self):
+ settings = parse_redis_url("redis://:secret@redis.example.com:6380/3")
+ assert settings.host == "redis.example.com"
+ assert settings.port == 6380
+ assert settings.database == 3
+ assert settings.password == "secret"
+
+ def test_no_path_defaults_to_db_0(self):
+ settings = parse_redis_url("redis://localhost:6379")
+ assert settings.database == 0
+
+ def test_empty_path_defaults_to_db_0(self):
+ settings = parse_redis_url("redis://localhost:6379/")
+ assert settings.database == 0
+
+ def test_no_port_defaults_to_6379(self):
+ settings = parse_redis_url("redis://myhost/1")
+ assert settings.port == 6379
+ assert settings.database == 1
+
+
+# ---------------------------------------------------------------------------
+# WorkerSettings
+# ---------------------------------------------------------------------------
+
+
+class TestWorkerSettings:
+ def test_worker_settings_attributes(self):
+ from async_worker.main import WorkerSettings
+
+ assert WorkerSettings.job_timeout == 600
+ assert WorkerSettings.max_jobs == 10
+ assert WorkerSettings.keep_result == 3600
+
+ def test_worker_settings_has_scrape_function(self):
+ from async_worker.main import WorkerSettings
+ from async_worker.tasks.scrape import scrape_and_enrich
+
+ assert scrape_and_enrich in WorkerSettings.functions
diff --git a/tests/unit/async_worker/test_outbox.py b/tests/unit/async_worker/test_outbox.py
new file mode 100644
index 0000000..37f29e6
--- /dev/null
+++ b/tests/unit/async_worker/test_outbox.py
@@ -0,0 +1,171 @@
+"""Tests for apps/async-worker/async_worker/services/outbox.py"""
+
+import json
+from unittest.mock import AsyncMock, patch, MagicMock
+
+import pytest
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture(autouse=True)
+def reset_outbox_module():
+ """Reset module-level global state before each test."""
+ import async_worker.services.outbox as outbox_mod
+
+ outbox_mod._redis = None
+ yield
+ outbox_mod._redis = None
+
+
+@pytest.fixture
+def mock_redis():
+ """Create a mock async Redis instance."""
+ r = AsyncMock()
+ r.lpush = AsyncMock()
+ r.aclose = AsyncMock()
+ return r
+
+
+# ---------------------------------------------------------------------------
+# get_outbox_redis
+# ---------------------------------------------------------------------------
+
+
+class TestGetOutboxRedis:
+ @pytest.mark.asyncio
+ async def test_creates_connection_on_first_call(self, mock_redis):
+ with patch(
+ "async_worker.services.outbox.aioredis.from_url",
+ return_value=mock_redis,
+ ) as mock_from_url:
+ from async_worker.services.outbox import get_outbox_redis
+
+ r = await get_outbox_redis()
+ assert r is mock_redis
+ mock_from_url.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_returns_same_instance_on_second_call(self, mock_redis):
+ with patch(
+ "async_worker.services.outbox.aioredis.from_url",
+ return_value=mock_redis,
+ ) as mock_from_url:
+ from async_worker.services.outbox import get_outbox_redis
+
+ r1 = await get_outbox_redis()
+ r2 = await get_outbox_redis()
+ assert r1 is r2
+ assert mock_from_url.call_count == 1
+
+
+# ---------------------------------------------------------------------------
+# push
+# ---------------------------------------------------------------------------
+
+
+class TestPush:
+ @pytest.mark.asyncio
+ async def test_push_metadata_item(self, mock_redis):
+ with patch(
+ "async_worker.services.outbox.aioredis.from_url",
+ return_value=mock_redis,
+ ):
+ from async_worker.services.outbox import push
+
+ await push(
+ job_id="j1",
+ chat_id=12345,
+ metadata_item={"title": "Test", "content": "hi"},
+ message_id=99,
+ )
+
+ mock_redis.lpush.assert_awaited_once()
+ args = mock_redis.lpush.call_args
+ queue_key = args[0][0]
+ payload = json.loads(args[0][1])
+
+ assert payload["job_id"] == "j1"
+ assert payload["chat_id"] == 12345
+ assert payload["message_id"] == 99
+ assert payload["metadata_item"] == {"title": "Test", "content": "hi"}
+ assert payload["error"] is None
+
+ @pytest.mark.asyncio
+ async def test_push_error(self, mock_redis):
+ with patch(
+ "async_worker.services.outbox.aioredis.from_url",
+ return_value=mock_redis,
+ ):
+ from async_worker.services.outbox import push
+
+ await push(
+ job_id="j2",
+ chat_id=42,
+ error="something broke",
+ )
+
+ payload = json.loads(mock_redis.lpush.call_args[0][1])
+ assert payload["error"] == "something broke"
+ assert payload["metadata_item"] is None
+
+ @pytest.mark.asyncio
+ async def test_push_unicode_content(self, mock_redis):
+ with patch(
+ "async_worker.services.outbox.aioredis.from_url",
+ return_value=mock_redis,
+ ):
+ from async_worker.services.outbox import push
+
+ await push(
+ job_id="j3",
+ chat_id=1,
+ metadata_item={"title": "\u4e2d\u6587\u6807\u9898", "emoji": "\U0001f600"},
+ )
+
+ raw = mock_redis.lpush.call_args[0][1]
+ # ensure_ascii=False means unicode should be preserved
+ assert "\u4e2d\u6587\u6807\u9898" in raw
+ assert "\U0001f600" in raw
+
+ @pytest.mark.asyncio
+ async def test_push_without_message_id(self, mock_redis):
+ with patch(
+ "async_worker.services.outbox.aioredis.from_url",
+ return_value=mock_redis,
+ ):
+ from async_worker.services.outbox import push
+
+ await push(job_id="j4", chat_id=1)
+
+ payload = json.loads(mock_redis.lpush.call_args[0][1])
+ assert payload["message_id"] is None
+
+
+# ---------------------------------------------------------------------------
+# close
+# ---------------------------------------------------------------------------
+
+
+class TestClose:
+ @pytest.mark.asyncio
+ async def test_close_when_connected(self, mock_redis):
+ import async_worker.services.outbox as outbox_mod
+
+ outbox_mod._redis = mock_redis
+ await outbox_mod.close()
+
+ mock_redis.aclose.assert_awaited_once()
+ assert outbox_mod._redis is None
+
+ @pytest.mark.asyncio
+ async def test_close_when_not_connected(self):
+ import async_worker.services.outbox as outbox_mod
+
+ outbox_mod._redis = None
+ # Should not raise
+ await outbox_mod.close()
+ assert outbox_mod._redis is None
diff --git a/tests/unit/async_worker/test_scrape_task.py b/tests/unit/async_worker/test_scrape_task.py
new file mode 100644
index 0000000..64b5f8b
--- /dev/null
+++ b/tests/unit/async_worker/test_scrape_task.py
@@ -0,0 +1,216 @@
+"""Tests for apps/async-worker/async_worker/tasks/scrape.py"""
+
+from unittest.mock import AsyncMock, MagicMock, patch
+import uuid
+
+import pytest
+
+from async_worker.tasks.scrape import scrape_and_enrich
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture
+def ctx():
+ """ARQ worker context dict."""
+ return {"redis": MagicMock()}
+
+
+@pytest.fixture
+def mock_info_extract():
+ """Patch InfoExtractService in the scrape module."""
+ with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls:
+ instance = AsyncMock()
+ instance.get_item = AsyncMock(
+ return_value={"title": "Test", "content": "hi
", "media_files": []}
+ )
+ MockCls.return_value = instance
+ yield MockCls, instance
+
+
+@pytest.fixture
+def mock_enrichment():
+ """Patch enrichment.enrich in the scrape module."""
+ with patch("async_worker.tasks.scrape.enrichment") as mock_mod:
+ mock_mod.enrich = AsyncMock(
+ return_value={
+ "title": "Test",
+ "content": "hi
",
+ "media_files": [],
+ "telegraph_url": "https://telegra.ph/test",
+ }
+ )
+ yield mock_mod
+
+
+@pytest.fixture
+def mock_outbox():
+ """Patch outbox.push in the scrape module."""
+ with patch("async_worker.tasks.scrape.outbox") as mock_mod:
+ mock_mod.push = AsyncMock()
+ yield mock_mod
+
+
+# ---------------------------------------------------------------------------
+# Success path
+# ---------------------------------------------------------------------------
+
+
+class TestScrapeAndEnrichSuccess:
+ @pytest.mark.asyncio
+ async def test_returns_success(
+ self, ctx, mock_info_extract, mock_enrichment, mock_outbox
+ ):
+ result = await scrape_and_enrich(
+ ctx,
+ url="https://twitter.com/user/status/1",
+ chat_id=12345,
+ source="twitter",
+ content_type="social_media",
+ )
+ assert result["status"] == "success"
+ assert "job_id" in result
+
+ @pytest.mark.asyncio
+ async def test_generates_job_id_when_not_provided(
+ self, ctx, mock_info_extract, mock_enrichment, mock_outbox
+ ):
+ result = await scrape_and_enrich(ctx, url="u", chat_id=1)
+ # Should be a valid UUID
+ uuid.UUID(result["job_id"])
+
+ @pytest.mark.asyncio
+ async def test_uses_provided_job_id(
+ self, ctx, mock_info_extract, mock_enrichment, mock_outbox
+ ):
+ result = await scrape_and_enrich(
+ ctx, url="u", chat_id=1, job_id="my-custom-id"
+ )
+ assert result["job_id"] == "my-custom-id"
+
+ @pytest.mark.asyncio
+ async def test_creates_url_metadata(
+ self, ctx, mock_info_extract, mock_enrichment, mock_outbox
+ ):
+ MockCls, _ = mock_info_extract
+ await scrape_and_enrich(
+ ctx,
+ url="https://example.com",
+ chat_id=1,
+ source="reddit",
+ content_type="post",
+ )
+ call_kwargs = MockCls.call_args.kwargs
+ url_metadata = call_kwargs["url_metadata"]
+ assert url_metadata.url == "https://example.com"
+ assert url_metadata.source == "reddit"
+ assert url_metadata.content_type == "post"
+
+ @pytest.mark.asyncio
+ async def test_passes_celery_app_and_timeout(
+ self, ctx, mock_info_extract, mock_enrichment, mock_outbox
+ ):
+ MockCls, _ = mock_info_extract
+ await scrape_and_enrich(ctx, url="u", chat_id=1)
+ call_kwargs = MockCls.call_args.kwargs
+ assert "celery_app" in call_kwargs
+ assert "timeout" in call_kwargs
+
+ @pytest.mark.asyncio
+ async def test_disables_telegraph_and_document_for_scraping(
+ self, ctx, mock_info_extract, mock_enrichment, mock_outbox
+ ):
+ """Scraping step should not do enrichment — that's handled separately."""
+ MockCls, _ = mock_info_extract
+ await scrape_and_enrich(ctx, url="u", chat_id=1)
+ call_kwargs = MockCls.call_args.kwargs
+ assert call_kwargs["store_telegraph"] is False
+ assert call_kwargs["store_document"] is False
+
+ @pytest.mark.asyncio
+ async def test_passes_enrichment_flags(
+ self, ctx, mock_info_extract, mock_enrichment, mock_outbox
+ ):
+ await scrape_and_enrich(
+ ctx, url="u", chat_id=1, store_telegraph=True, store_document=True
+ )
+ mock_enrichment.enrich.assert_awaited_once()
+ call_kwargs = mock_enrichment.enrich.call_args.kwargs
+ assert call_kwargs["store_telegraph"] is True
+ assert call_kwargs["store_document"] is True
+
+ @pytest.mark.asyncio
+ async def test_pushes_result_to_outbox(
+ self, ctx, mock_info_extract, mock_enrichment, mock_outbox
+ ):
+ await scrape_and_enrich(
+ ctx,
+ url="u",
+ chat_id=42,
+ job_id="j1",
+ message_id=99,
+ )
+ mock_outbox.push.assert_awaited_once()
+ call_kwargs = mock_outbox.push.call_args.kwargs
+ assert call_kwargs["job_id"] == "j1"
+ assert call_kwargs["chat_id"] == 42
+ assert call_kwargs["message_id"] == 99
+ assert call_kwargs["metadata_item"] is not None
+
+
+# ---------------------------------------------------------------------------
+# Error path
+# ---------------------------------------------------------------------------
+
+
+class TestScrapeAndEnrichError:
+ @pytest.mark.asyncio
+ async def test_scraping_failure_pushes_error(
+ self, ctx, mock_enrichment, mock_outbox
+ ):
+ with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls:
+ instance = AsyncMock()
+ instance.get_item = AsyncMock(side_effect=RuntimeError("scrape boom"))
+ MockCls.return_value = instance
+
+ result = await scrape_and_enrich(
+ ctx, url="u", chat_id=1, job_id="j-err"
+ )
+
+ assert result["status"] == "error"
+ assert "scrape boom" in result["error"]
+ mock_outbox.push.assert_awaited_once()
+ assert mock_outbox.push.call_args.kwargs["error"] == "scrape boom"
+
+ @pytest.mark.asyncio
+ async def test_enrichment_failure_pushes_error(
+ self, ctx, mock_info_extract, mock_outbox
+ ):
+ with patch("async_worker.tasks.scrape.enrichment") as mock_enrich:
+ mock_enrich.enrich = AsyncMock(
+ side_effect=ValueError("enrich failed")
+ )
+ result = await scrape_and_enrich(
+ ctx, url="u", chat_id=1, job_id="j-err2"
+ )
+
+ assert result["status"] == "error"
+ assert "enrich failed" in result["error"]
+ mock_outbox.push.assert_awaited_once()
+ assert mock_outbox.push.call_args.kwargs["error"] == "enrich failed"
+
+ @pytest.mark.asyncio
+ async def test_error_includes_chat_id_in_outbox(
+ self, ctx, mock_enrichment, mock_outbox
+ ):
+ with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls:
+ instance = AsyncMock()
+ instance.get_item = AsyncMock(side_effect=RuntimeError("fail"))
+ MockCls.return_value = instance
+
+ await scrape_and_enrich(ctx, url="u", chat_id=99999)
+
+ assert mock_outbox.push.call_args.kwargs["chat_id"] == 99999
diff --git a/tests/unit/file_export/__init__.py b/tests/unit/file_export/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/unit/file_export/test_audio_transcribe.py b/tests/unit/file_export/test_audio_transcribe.py
new file mode 100644
index 0000000..92584b4
--- /dev/null
+++ b/tests/unit/file_export/test_audio_transcribe.py
@@ -0,0 +1,106 @@
+"""Tests for packages/shared/fastfetchbot_shared/services/file_export/audio_transcribe.py"""
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from fastfetchbot_shared.services.file_export.audio_transcribe import AudioTranscribe
+
+
+# ---------------------------------------------------------------------------
+# __init__
+# ---------------------------------------------------------------------------
+
+
+class TestAudioTranscribeInit:
+ def test_stores_all_fields(self):
+ mock_celery = MagicMock()
+ at = AudioTranscribe(
+ audio_file="/tmp/audio.mp3",
+ celery_app=mock_celery,
+ timeout=300,
+ )
+ assert at.audio_file == "/tmp/audio.mp3"
+ assert at.celery_app is mock_celery
+ assert at.timeout == 300
+
+ def test_default_timeout(self):
+ mock_celery = MagicMock()
+ at = AudioTranscribe(audio_file="/tmp/a.mp3", celery_app=mock_celery)
+ assert at.timeout == 600
+
+
+# ---------------------------------------------------------------------------
+# transcribe
+# ---------------------------------------------------------------------------
+
+
+class TestTranscribe:
+ @pytest.mark.asyncio
+ async def test_transcribe_success(self):
+ mock_result = MagicMock()
+ mock_result.get.return_value = {"transcript": "Hello world, this is a test."}
+ mock_celery = MagicMock()
+ mock_celery.send_task.return_value = mock_result
+
+ at = AudioTranscribe(
+ audio_file="/tmp/audio.mp3",
+ celery_app=mock_celery,
+ timeout=120,
+ )
+ text = await at.transcribe()
+
+ assert text == "Hello world, this is a test."
+
+ @pytest.mark.asyncio
+ async def test_transcribe_sends_correct_task(self):
+ mock_result = MagicMock()
+ mock_result.get.return_value = {"transcript": "ok"}
+ mock_celery = MagicMock()
+ mock_celery.send_task.return_value = mock_result
+
+ at = AudioTranscribe(
+ audio_file="/tmp/speech.wav", celery_app=mock_celery
+ )
+ await at.transcribe()
+
+ mock_celery.send_task.assert_called_once_with(
+ "file_export.transcribe",
+ kwargs={"audio_file": "/tmp/speech.wav"},
+ )
+
+ @pytest.mark.asyncio
+ async def test_transcribe_uses_timeout(self):
+ mock_result = MagicMock()
+ mock_result.get.return_value = {"transcript": "ok"}
+ mock_celery = MagicMock()
+ mock_celery.send_task.return_value = mock_result
+
+ at = AudioTranscribe(audio_file="f", celery_app=mock_celery, timeout=99)
+ await at.transcribe()
+
+ mock_result.get.assert_called_once_with(timeout=99)
+
+ @pytest.mark.asyncio
+ async def test_transcribe_failure_reraises(self):
+ mock_result = MagicMock()
+ mock_result.get.side_effect = RuntimeError("transcription failed")
+ mock_celery = MagicMock()
+ mock_celery.send_task.return_value = mock_result
+
+ at = AudioTranscribe(audio_file="f", celery_app=mock_celery)
+
+ with pytest.raises(RuntimeError, match="transcription failed"):
+ await at.transcribe()
+
+ @pytest.mark.asyncio
+ async def test_transcribe_timeout_error(self):
+ mock_result = MagicMock()
+ mock_result.get.side_effect = TimeoutError("celery timeout")
+ mock_celery = MagicMock()
+ mock_celery.send_task.return_value = mock_result
+
+ at = AudioTranscribe(audio_file="f", celery_app=mock_celery)
+
+ with pytest.raises(TimeoutError):
+ await at.transcribe()
diff --git a/tests/unit/file_export/test_pdf_export.py b/tests/unit/file_export/test_pdf_export.py
new file mode 100644
index 0000000..665901c
--- /dev/null
+++ b/tests/unit/file_export/test_pdf_export.py
@@ -0,0 +1,182 @@
+"""Tests for packages/shared/fastfetchbot_shared/services/file_export/pdf_export.py"""
+
+import asyncio
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from fastfetchbot_shared.services.file_export.pdf_export import PdfExport, wrap_html_string
+
+
+# ---------------------------------------------------------------------------
+# wrap_html_string (pure function)
+# ---------------------------------------------------------------------------
+
+
+class TestWrapHtmlString:
+ def test_wraps_in_html_document(self):
+ result = wrap_html_string("Hello
")
+ assert "" in result
+ assert "" in result
+ assert "" in result
+ assert "Hello" in result
+
+ def test_includes_utf8_meta_tags(self):
+ result = wrap_html_string("test
")
+ assert 'charset="UTF-8"' in result or "charset=utf-8" in result.lower()
+
+ def test_strips_inline_styles(self):
+ html = 'styled text
'
+ result = wrap_html_string(html)
+ assert 'style=' not in result
+ assert "styled text" in result
+
+ def test_removes_style_tags(self):
+ html = "content
"
+ result = wrap_html_string(html)
+ assert "