diff --git a/docs/_sidebar.md b/docs/_sidebar.md index bef51b37..2b8efd4a 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -13,6 +13,7 @@ * [分布式爬虫-Spider](usage/Spider.md) * [任务爬虫-TaskSpider](usage/TaskSpider.md) * [批次爬虫-BatchSpider](usage/BatchSpider.md) + * [文件爬虫-FileSpider](usage/FileSpider.md) * [爬虫集成](usage/爬虫集成.md) * 使用进阶 diff --git "a/docs/foreword/\345\212\237\350\203\275\346\246\202\350\247\210.md" "b/docs/foreword/\345\212\237\350\203\275\346\246\202\350\247\210.md" index 9c714a34..4302064f 100644 --- "a/docs/foreword/\345\212\237\350\203\275\346\246\202\350\247\210.md" +++ "b/docs/foreword/\345\212\237\350\203\275\346\246\202\350\247\210.md" @@ -24,19 +24,23 @@ 2. 内存去重:处理一万条数据约0.5秒。 去重一亿条数据占用内存约285MB 3. 永久去重:处理一万条数据约3.5秒。去重一亿条数据占用内存约285MB -## 5. 数据采集完整性 +## 5. 支持批量文件下载 + +FileSpider 专用于批量下载文件/图片场景。一个任务包含多个待下载文件的 URL 列表,框架自动遍历生成下载请求,追踪下载进度,支持保存到本地磁盘或直接上传云存储。内置可选的文件去重机制,同一 URL 跨任务不重复下载。 + +## 6. 数据采集完整性 feapder对于每一条URL数据的抓取采取了强状态的控制,做到采集任务中URL抓取100%不丢失,即使多次尝试失败的URL也会进入错误队列并记录失败原因日志。这一特性对于很多强依赖采集数据的业务场景非常重要,保证数据用的放心。 -## 6. 数据自动入库 +## 7. 数据自动入库 只需要根据数据库表自动生成item,然后给item属性赋值,直接yield 返回即可批量入库 -## 7. 支持Debug模式 +## 8. 支持Debug模式 爬虫支持debug模式,debug模式下默认数据不入库、不修改任务状态。可针对某个任务进行调试,方便开发 -## 8. 完善的报警机制 +## 9. 完善的报警机制 为了保证数据的全量性、准确性、时效性,本框架内置报警机制,有了这些报警,我们可以实时掌握爬虫状态 @@ -53,7 +57,7 @@ feapder对于每一条URL数据的抓取采取了强状态的控制,做到采 ![-w416](http://markdown-media.oss-cn-beijing.aliyuncs.com/2020/12/29/16092335882158.jpg) -## 9. 下载监控 +## 10. 下载监控 框架对请求总数、成功数、失败数、解析异常数进行监控,将数据点打入到infuxdb,结合Grafana面板,可方便掌握抓取情况 diff --git a/docs/usage/FileSpider.md b/docs/usage/FileSpider.md new file mode 100644 index 00000000..74c143e4 --- /dev/null +++ b/docs/usage/FileSpider.md @@ -0,0 +1,409 @@ +# FileSpider + +FileSpider 是一款分布式文件下载爬虫,专用于批量下载文件/图片的场景。 + +核心特征: +- **一对多**: 一个任务包含多个待下载文件的 URL 列表,框架自动遍历生成下载请求 +- **进度追踪**: 框架自动追踪每个任务的下载进度(成功数/失败数/跳过数/去重数/总数) +- **结果有序**: 下载结果列表与原始 URL 列表严格位置对应 +- **灵活存储**: 默认保存到本地磁盘,可重写为上传云存储(OSS/S3 等),不落盘 +- **文件去重**: 任务内相同 URL 自动去重;可选跨任务去重(Redis / MySQL / 自定义) +- **HTTP 校验**: 默认对 4xx/5xx 响应触发重试,用户可重写 `validate` 自定义校验 +- **用户控制**: 任务成功/失败由用户在回调中显式决定 + +FileSpider 继承自 TaskSpider,复用了全部任务管理能力(MySQL 任务表、Redis 队列、断点续爬、丢失任务回收、分布式支持等)。 + +## 1. 任务表 + +### MySQL 任务表(建议结构) + +```sql +CREATE TABLE `file_task` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `file_urls` text COMMENT '待下载文件URL列表,JSON数组格式', + `state` int(11) DEFAULT 0 COMMENT '任务状态: 0待做 2下载中 1完成 -1失败', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` + +字段说明: +- `id`: 任务主键,必须有 +- `file_urls`: 存放待下载文件 URL 的 JSON 数组,字段名可自定义 +- `state`: 任务状态字段,字段名可通过 `task_state` 参数配置。0=待做,2=已下发(框架自动设置),1=完成,-1=失败(由用户代码设置) + +## 2. 用户需实现的方法 + +### 必须实现 + +| 方法 | 说明 | +|------|------| +| `get_download_urls(task)` | 从 task 中提取文件 URL 列表,返回 `List[str]` | +| `on_task_all_done(task, result, success_count, fail_count, skipped_count, dup_count, total_count)` | 任务所有文件处理完毕的回调,在此 yield Item 或 update_task_batch 更新状态 | + +### 可选重写 + +| 方法 | 说明 | 默认行为 | +|------|------|----------| +| `get_file_path(task, url, index)` | 返回文件保存路径/存储标识 | `{save_dir}/{task_id}/{index}_{md5(filename)}{ext}` | +| `process_file(task_id, url, file_path, response)` | 处理文件内容,返回最终存储位置(需保证幂等,不可返回空值) | 流式保存到本地磁盘,返回本地路径 | +| `validate(request, response)` | 校验下载响应 | 4xx/5xx抛异常触发重试,3xx自动跟随 | +| `on_file_downloaded(task_id, url, file_path)` | 单个文件下载成功回调 | 无 | +| `on_file_failed(task_id, url, error)` | 单个文件下载失败回调 | 无 | + +### 方法分层 + +``` +save_file (框架层,不应重写) + ├── process_file (用户层,按需重写) + │ ├── 默认: 保存到本地磁盘,返回本地路径 + │ └── 重写: 上传云存储,返回云存储 URL + ├── Redis 进度追踪 (自动,幂等计数) + ├── on_file_downloaded 回调 + └── 检查是否所有文件完成 + └── on_task_all_done (用户实现) + ├── yield Item → 写入结果表 + └── yield update_task_batch → 更新任务状态 +``` + +### `process_file` 约束 + +`process_file` 在下载失败重试时可能被多次调用(同一 URL、同一 `file_path`),实现需保证幂等性: +- 默认实现使用 `"wb"` 模式覆盖写入,天然幂等 +- 重写时避免使用追加模式(`"ab"`) +- 云存储场景建议使用 `put_object` 等覆盖语义的 API + +**返回值要求**: 必须返回非空字符串(文件最终存储位置)。返回 `None` 或空字符串 `""` 会被视为处理失败,触发框架重试,直至重试次数耗尽后计入失败。 + +### `on_task_all_done` 参数说明 + +```python +def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + """ + task: PerfectDict - 任务对象,包含 task_keys 指定的字段,可通过 task.id 获取任务 ID + result: List[str|None] + - 与 get_download_urls 返回的列表严格位置对应 + - 成功: 文件存储位置(本地路径或云存储 URL) + - 失败/跳过: None + - 任务内重复URL: 继承首次出现的结果 + 例: ["https://oss.com/a.jpg", "https://oss.com/b.jpg", None, "https://oss.com/a.jpg"] + success_count: 成功数(含去重缓存命中) + fail_count: 下载失败数(重试耗尽) + skipped_count: 跳过数(无效URL、get_file_path异常等) + dup_count: 任务内重复URL数 + total_count: 总数(success + fail + skipped + dup = total) + """ +``` + +### `on_task_all_done` 设计约定与实现建议 + +`on_task_all_done` 是业务回调,**任务状态由用户代码显式控制**(通常通过 `yield self.update_task_batch(...)`)。 + +- 若该方法抛异常,框架不会自动改写任务状态;任务可能保持 `doing(2)` +- 后续会由 TaskSpider 的丢失任务恢复机制重新下发任务 +- 因此该方法建议按“可重试、可重入”方式实现,保证幂等 + +推荐实践: +- 先产出结果数据,再更新任务状态,避免状态先行导致结果缺失 +- 对外部副作用(通知、回调第三方、写非幂等系统)增加幂等保护 +- 异常日志要包含 `task.id`、计数信息和关键上下文,便于快速排障 + +#### 新手解释:什么是“幂等” + +幂等可以理解为:**同一个操作执行 1 次和执行多次,最终结果一致**。 + +在 `FileSpider` 中,常见重试来源有网络重试、进程重启、丢失任务回收。 +因此 `on_task_all_done` 需要按“可能被重复执行”来设计: + +- 幂等写法:`state` 直接设置为目标值(如 1 或 -1) +- 非幂等写法:每次执行都做自增/重复插入/重复通知 + +#### 推荐写法案例(可重试、可重入) + +```python +from feapder.utils.log import log + + +class MyFileSpider(feapder.FileSpider): + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + task_id = task.id + log.info( + f"任务{task_id}完成 success={success_count} fail={fail_count} " + f"skipped={skipped_count} dup={dup_count} total={total_count}" + ) + + # 1) 先写业务结果(示例:可按需 yield Item) + # item = FileResultItem() + # item.task_id = task_id + # item.result_urls = result + # yield item + + # 2) 最后更新任务状态(设置目标值,天然幂等) + done_state = 1 if fail_count == 0 and success_count > 0 else -1 + yield self.update_task_batch(task_id, done_state) +``` + +## 3. 构造参数 + +| 参数 | 类型 | 说明 | +|------|------|------| +| `redis_key` | str | Redis key 前缀(必填) | +| `task_table` | str | MySQL 任务表名(必填) | +| `task_keys` | list | 需要获取的任务字段列表(必填) | +| `save_dir` | str | 文件保存根目录,默认 `./downloads` | +| `file_dedup` | None/str/FileDedup | 文件去重策略:None 不去重,`"redis"` / `"mysql"` / FileDedup 实例 | +| `file_dedup_expire` | int | Redis 去重缓存过期时间(秒),仅 `file_dedup="redis"` 时生效 | +| `task_state` | str | 任务状态字段名,默认 `state` | +| `min_task_count` | int | Redis 中最少任务数,默认 10000 | +| `check_task_interval` | int | 检查任务间隔(秒),默认 5 | +| `task_limit` | int | 每次取任务数量,默认 10000 | +| `task_condition` | str | 任务筛选条件(WHERE 后的 SQL) | +| `task_order_by` | str | 取任务排序条件 | +| `thread_count` | int | 线程数 | +| `keep_alive` | bool | 是否常驻 | + +## 4. 使用示例 + +### 启动方式(单进程 / master-worker 分离) + +FileSpider 支持两种启动方式: + +1. 单进程:`spider.start()`,适合本地调试 +2. 分离运行:master 仅负责派发任务,worker 仅负责下载处理,适合生产部署 + +```python +from feapder import ArgumentParser + +if __name__ == "__main__": + spider = MyFileSpider( + redis_key="my_file_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + ) + + parser = ArgumentParser(description="MyFileSpider 文件下载爬虫") + parser.add_argument( + "--start_master", + action="store_true", + help="添加任务", + function=spider.start_monitor_task, + ) + parser.add_argument( + "--start_worker", + action="store_true", + help="启动爬虫", + function=spider.start, + ) + parser.start() +``` + +命令行启动: + +```bash +uv run my_file_spider.py --start_master +uv run my_file_spider.py --start_worker +``` + +### 场景一:保存到本地磁盘 + +最简单的用法,下载文件保存到本地: + +```python +import json +import feapder + + +class LocalFileSpider(feapder.FileSpider): + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + # fail_count == 0 且有实际成功下载则标记完成;全部跳过或无有效URL标记失败 + if fail_count == 0 and success_count > 0: + yield self.update_task_batch(task.id, 1) + else: + yield self.update_task_batch(task.id, -1) + + +if __name__ == "__main__": + spider = LocalFileSpider( + redis_key="local_file_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + save_dir="./downloads", + ) + spider.start() +``` + +### 场景二:上传云存储 + +重写 `process_file` 实现直接上传云存储: + +```python +import json +import os +import feapder +from urllib.parse import urlparse, unquote + + +class OssFileSpider(feapder.FileSpider): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # 初始化云存储客户端 + self.oss_client = OSSClient(bucket="my-bucket") + + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def get_file_path(self, task, url, index): + """返回 OSS 存储 key(不是本地路径)""" + filename = os.path.basename(unquote(urlparse(url).path)) + return f"files/{task.id}/{index}_{filename}" + + def process_file(self, task_id, url, file_path, response): + """上传 OSS,返回云存储 URL""" + self.oss_client.put_object(file_path, response.content) + return f"https://my-bucket.oss.aliyuncs.com/{file_path}" + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + if success_count > 0: + yield self.update_task_batch(task.id, 1) + else: + yield self.update_task_batch(task.id, -1) + + +if __name__ == "__main__": + spider = OssFileSpider( + redis_key="oss_file_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + ) + spider.start() +``` + +### 场景三:上传云存储 + 结果入库 + +先创建结果 Item: + +```bash +feapder create -i file_result +``` + +编辑生成的 `items/file_result_item.py`,添加所需字段,然后在爬虫中引用: + +```python +import json +import os +import feapder +from urllib.parse import urlparse, unquote +from items.file_result_item import FileResultItem + + +class OssResultSpider(feapder.FileSpider): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.oss_client = OSSClient(bucket="my-bucket") + + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def get_file_path(self, task, url, index): + filename = os.path.basename(unquote(urlparse(url).path)) + return f"files/{task.id}/{index}_{filename}" + + def process_file(self, task_id, url, file_path, response): + self.oss_client.put_object(file_path, response.content) + return f"https://my-bucket.oss.aliyuncs.com/{file_path}" + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + # result 与 get_download_urls 返回的列表严格位置对应,下载失败的用 None 占位 + item = FileResultItem() + item.task_id = task.id + item.result_urls = result + yield item + + if fail_count == 0 and success_count > 0: + yield self.update_task_batch(task.id, 1) + else: + yield self.update_task_batch(task.id, -1) +``` + +### 场景四:启用文件去重 + +通过 `file_dedup` 参数启用跨任务去重,同一 URL 跨任务不重复下载: + +```python +import json +import feapder + + +class DedupFileSpider(feapder.FileSpider): + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + yield self.update_task_batch(task.id, 1 if fail_count == 0 and success_count > 0 else -1) + + +if __name__ == "__main__": + spider = DedupFileSpider( + redis_key="dedup_file_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + save_dir="./downloads", + file_dedup="redis", # "redis" / "mysql" / FileDedup 实例 + ) + spider.start() +``` + +## 5. 文件去重 + +### 去重层级 + +FileSpider 提供两级去重: + +1. **任务内去重(自动)**: 同一任务的 URL 列表中出现的重复 URL,只下载一次,重复项继承首次出现的结果 +2. **跨任务去重(可选)**: 通过 `file_dedup` 参数启用,不同任务中出现的相同 URL 只下载一次 + +### 跨任务去重策略 + +| 策略 | 参数值 | 存储 | 适用场景 | +|------|--------|------|----------| +| 不去重 | `None`(默认) | - | 每次都重新下载 | +| Redis 去重 | `"redis"` | Redis Hash | 分布式共享,多进程安全 | +| MySQL 去重 | `"mysql"` | MySQL 表(按 `redis_key` 自动分表) | 持久化,隔离不同业务 | +| 自定义去重 | `FileDedup` 实例 | 用户自定义 | 特殊需求 | + +### 自定义去重 + +继承 `FileDedup` 接口: + +```python +from feapder.dedup.file_dedup import FileDedup + +class MyFileDedup(FileDedup): + def get(self, url): + """返回缓存结果,无缓存返回 None""" + ... + + def set(self, url, result_url): + """缓存处理结果""" + ... +``` + +## 6. Debug 模式 + +支持 Debug 模式,可针对单个任务调试: + +```python +if __name__ == "__main__": + spider = MyFileSpider.to_DebugFileSpider( + task_id=1, + redis_key="my_file_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + save_dir="./downloads", + ) + spider.start() +``` + +Debug 模式下默认不入库、不更新任务状态。 diff --git a/feapder/__init__.py b/feapder/__init__.py index 565be4b9..ed5acf90 100644 --- a/feapder/__init__.py +++ b/feapder/__init__.py @@ -18,9 +18,11 @@ "Spider", "TaskSpider", "BatchSpider", + "FileSpider", "BaseParser", "TaskParser", "BatchParser", + "FileParser", "Request", "Response", "Item", @@ -28,8 +30,8 @@ "ArgumentParser", ] -from feapder.core.spiders import AirSpider, Spider, TaskSpider, BatchSpider -from feapder.core.base_parser import BaseParser, TaskParser, BatchParser +from feapder.core.spiders import AirSpider, Spider, TaskSpider, BatchSpider, FileSpider +from feapder.core.base_parser import BaseParser, TaskParser, BatchParser, FileParser from feapder.network.request import Request from feapder.network.response import Response from feapder.network.item import Item, UpdateItem diff --git a/feapder/commands/create/create_spider.py b/feapder/commands/create/create_spider.py index f464e059..13243312 100644 --- a/feapder/commands/create/create_spider.py +++ b/feapder/commands/create/create_spider.py @@ -57,8 +57,10 @@ def get_spider_template(self, spider_type): template_path = "task_spider_template.tmpl" elif spider_type == "BatchSpider": template_path = "batch_spider_template.tmpl" + elif spider_type == "FileSpider": + template_path = "file_spider_template.tmpl" else: - raise ValueError("spider type error, only support AirSpider、 Spider、TaskSpider、BatchSpider") + raise ValueError("spider type error, only support AirSpider、Spider、TaskSpider、BatchSpider、FileSpider") template_path = os.path.abspath( os.path.join(__file__, "../../../templates", template_path) diff --git a/feapder/commands/create_builder.py b/feapder/commands/create_builder.py index dec0ba05..1703ebf9 100644 --- a/feapder/commands/create_builder.py +++ b/feapder/commands/create_builder.py @@ -87,7 +87,7 @@ def main(): elif args.spider: c = Choice( "请选择爬虫模板", - ["AirSpider", "Spider", "TaskSpider", "BatchSpider"], + ["AirSpider", "Spider", "TaskSpider", "BatchSpider", "FileSpider"], icon_style=StringStyle(fore=Fore.green), selected_style=StringStyle(fore=Fore.green), ) diff --git a/feapder/core/base_parser.py b/feapder/core/base_parser.py index a06f9c44..42d76bd0 100644 --- a/feapder/core/base_parser.py +++ b/feapder/core/base_parser.py @@ -8,6 +8,7 @@ @email: boris_liu@foxmail.com """ import os +from urllib.parse import urlparse, unquote import feapder.utils.tools as tools from feapder.db.mysqldb import MysqlDB @@ -191,6 +192,98 @@ def update_task_batch(self, task_id, state=1, **kwargs): return update_item +class FileParser(TaskParser): + """ + @summary: 文件下载爬虫模版 + --------- + """ + + def __init__(self, task_table, task_state, mysqldb=None, save_dir="./downloads"): + super(FileParser, self).__init__( + task_table=task_table, task_state=task_state, mysqldb=mysqldb + ) + self._save_dir = save_dir + + def get_download_urls(self, task): + """ + 从 task 中获取需要下载的文件 URL 列表,用户必须实现 + @param task: 任务信息 + @return: List[str] - URL 列表 + """ + raise NotImplementedError("必须实现 get_download_urls 方法") + + def get_file_path(self, task, url, index): + """ + 返回文件保存路径/标识,用户可重写 + 本地场景: 返回本地文件路径,如 ./downloads/123/0_image.jpg + 云存储场景: 返回存储标识/key,如 bucket/prefix/123/0_image.jpg + @param task: 任务信息 + @param url: 文件 URL + @param index: 文件在 URL 列表中的索引,默认实现用于避免同名文件覆盖 + @return: str - 文件路径或存储标识 + """ + parsed = urlparse(url) + filename = os.path.basename(unquote(parsed.path)) or "unknown" + filename = f"{index}_{filename}" + return os.path.join(self._save_dir, str(task.id), filename) + + def process_file(self, task_id, url, file_path, response): + """ + 处理下载的文件内容,返回文件最终存储位置。用户按需重写 + 默认实现: 流式保存到本地磁盘,返回本地路径 + 云存储场景: 重写此方法上传到 OSS/S3 等,返回云存储 URL + 注意: + - 此方法在下载失败重试时可能被多次调用,实现需保证幂等性 + - 必须返回非空字符串,返回空值会触发重试直至失败 + @param task_id: 任务 ID + @param url: 文件原始 URL + @param file_path: get_file_path 返回的路径/标识 + @param response: 下载响应 + @return: str - 文件最终存储位置(不可为空) + """ + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + return file_path + + def on_file_downloaded(self, task_id, url, file_path): + """ + 单个文件下载成功的回调,用户可重写 + @param task_id: 任务 ID + @param url: 文件原始 URL + @param file_path: 文件存储位置 + """ + pass + + def on_file_failed(self, task_id, url, error): + """ + 单个文件下载失败的回调,用户可重写 + @param task_id: 任务 ID + @param url: 文件原始 URL + @param error: 异常信息 + """ + pass + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + """ + 任务所有文件处理完毕的回调 + 用户应在此方法中 yield Item 写入结果表、yield self.update_task_batch() 更新任务状态 + @param task: PerfectDict - 任务对象,包含 task_keys 指定的字段 + @param result: List[str|None] - 每个文件的处理结果, + 顺序与 get_download_urls 返回的列表一致。 + 成功为文件存储位置(本地路径或云存储 URL),失败为 None。 + 任务内重复URL的结果继承首次出现的结果 + @param success_count: 成功数(含去重缓存命中) + @param fail_count: 下载失败数(重试耗尽) + @param skipped_count: 跳过数(无效URL、get_file_path异常等) + @param dup_count: 任务内重复URL数 + @param total_count: 总数(success + fail + skipped + dup = total) + """ + pass + + class BatchParser(TaskParser): """ @summary: 批次爬虫模版 diff --git a/feapder/core/parser_control.py b/feapder/core/parser_control.py index 021d2956..97a71fae 100644 --- a/feapder/core/parser_control.py +++ b/feapder/core/parser_control.py @@ -33,7 +33,7 @@ class ParserControl(threading.Thread): is_show_tip = False - # 实时统计已做任务数及失败任务数,若失败任务数/已做任务数>0.5 则报警 + # 实时统计请求成功数及失败数,用于计算请求成功率报警 _success_task_count = 0 _failed_task_count = 0 _total_task_count = 0 @@ -396,6 +396,13 @@ def deal_request(self, request): if response and getattr(response, "browser", None): request.render_downloader.put_back(response.browser) + # 释放连接(stream=True 时未消费完 body 会占用连接池) + if response and hasattr(response, "close"): + try: + response.close() + except Exception: + pass + break # 删除正在做的request 跟随item优先 @@ -455,7 +462,7 @@ def add_parser(self, parser: BaseParser): class AirSpiderParserControl(ParserControl): is_show_tip = False - # 实时统计已做任务数及失败任务数,若失败任务数/已做任务数>0.5 则报警 + # 实时统计请求成功数及失败数,用于计算请求成功率报警 _success_task_count = 0 _failed_task_count = 0 @@ -732,6 +739,13 @@ def deal_request(self, request): if response and getattr(response, "browser", None): request.render_downloader.put_back(response.browser) + # 释放连接(stream=True 时未消费完 body 会占用连接池) + if response and hasattr(response, "close"): + try: + response.close() + except Exception: + pass + break if setting.SPIDER_SLEEP_TIME: diff --git a/feapder/core/scheduler.py b/feapder/core/scheduler.py index 0177d185..caff471d 100644 --- a/feapder/core/scheduler.py +++ b/feapder/core/scheduler.py @@ -330,19 +330,19 @@ def check_task_status(self): else: return - # 检查失败任务数量 超过1000 报警, + # 检查失败请求数量,超过阈值则报警 failed_count = self._redisdb.zget_count(self._tab_failed_requests) if failed_count > setting.WARNING_FAILED_COUNT: # 发送报警 - msg = "《%s》爬虫当前失败任务数:%s, 请检查爬虫是否正常" % (self._spider_name, failed_count) + msg = "《%s》爬虫当前失败请求数:%s, 请检查爬虫是否正常" % (self._spider_name, failed_count) log.error(msg) self.send_msg( msg, level="error", - message_prefix="《%s》爬虫当前失败任务数报警" % (self._spider_name), + message_prefix="《%s》爬虫当前失败请求数报警" % (self._spider_name), ) - # parser_control实时统计已做任务数及失败任务数,若成功率<0.5 则报警 + # parser_control实时统计请求成功数及失败数,若请求成功率低于阈值则报警 ( failed_task_count, success_task_count, @@ -351,9 +351,9 @@ def check_task_status(self): total_count = success_task_count + failed_task_count if total_count > 0: task_success_rate = success_task_count / total_count - if task_success_rate < 0.5: + if task_success_rate < setting.WARNING_SUCCESS_RATE: # 发送报警 - msg = "《%s》爬虫当前任务成功数%s, 失败数%s, 成功率 %.2f, 请检查爬虫是否正常" % ( + msg = "《%s》爬虫当前请求成功数%s, 失败数%s, 成功率 %.2f, 请检查爬虫是否正常" % ( self._spider_name, success_task_count, failed_task_count, @@ -363,7 +363,7 @@ def check_task_status(self): self.send_msg( msg, level="error", - message_prefix="《%s》爬虫当前任务成功率报警" % (self._spider_name), + message_prefix="《%s》爬虫当前请求成功率报警" % (self._spider_name), ) # 判断任务数是否变化 diff --git a/feapder/core/spiders/__init__.py b/feapder/core/spiders/__init__.py index a32ba668..041e47fe 100644 --- a/feapder/core/spiders/__init__.py +++ b/feapder/core/spiders/__init__.py @@ -8,9 +8,10 @@ @email: boris_liu@foxmail.com """ -__all__ = ["AirSpider", "TaskSpider", "Spider", "BatchSpider"] +__all__ = ["AirSpider", "TaskSpider", "Spider", "BatchSpider", "FileSpider"] from feapder.core.spiders.air_spider import AirSpider from feapder.core.spiders.spider import Spider from feapder.core.spiders.task_spider import TaskSpider from feapder.core.spiders.batch_spider import BatchSpider +from feapder.core.spiders.file_spider import FileSpider diff --git a/feapder/core/spiders/file_spider.py b/feapder/core/spiders/file_spider.py new file mode 100644 index 00000000..d1c7e54f --- /dev/null +++ b/feapder/core/spiders/file_spider.py @@ -0,0 +1,738 @@ +# -*- coding: utf-8 -*- +""" +Created on 2026/4/7 +--------- +@summary: 文件下载爬虫 +--------- +""" + +import hashlib +import os +import re +import warnings +from urllib.parse import urlparse, unquote + +from redis.exceptions import NoScriptError + +import feapder.setting as setting +import feapder.utils.tools as tools +from feapder.core.spiders.task_spider import TaskSpider +from feapder.dedup.file_dedup import FileDedup, RedisFileDedup, MysqlFileDedup +from feapder.network.item import UpdateItem +from feapder.network.request import Request +from feapder.utils.log import log + +CONSOLE_PIPELINE_PATH = "feapder.pipelines.console_pipeline.ConsolePipeline" + + +class FileSpider(TaskSpider): + """ + 文件下载爬虫 + + 基于 TaskSpider,专用于批量下载文件/图片的场景。 + - 一个任务包含多个待下载文件的 URL 列表(一对多) + - 框架自动追踪每个任务的下载进度 + - 支持保存到本地磁盘或上传云存储 + - 任务成功/失败由用户在 on_task_all_done 中显式决定 + - 可选文件去重,同一 URL 不重复下载 + """ + + def __init__( + self, + redis_key, + task_table, + task_keys, + save_dir="./downloads", + file_dedup=None, + file_dedup_expire=None, + task_table_type="mysql", + task_state="state", + min_task_count=10000, + check_task_interval=5, + task_limit=10000, + related_redis_key=None, + related_batch_record=None, + task_condition="", + task_order_by="", + thread_count=None, + begin_callback=None, + end_callback=None, + delete_keys=(), + keep_alive=None, + batch_interval=0, + use_mysql=True, + **kwargs, + ): + """ + @summary: 文件下载爬虫 + --------- + @param redis_key: 任务等数据存放在 redis 中的 key 前缀 + @param task_table: mysql 中的任务表 + @param task_keys: 需要获取的任务字段 列表 + @param save_dir: 文件保存根目录,默认 ./downloads + @param file_dedup: 文件去重策略。 + None: 不去重(默认) + "redis": 使用 Redis Hash 去重 + "mysql": 使用 MySQL 表去重 + FileDedup 实例: 自定义去重实现 + @param file_dedup_expire: Redis 去重缓存过期时间(秒),仅 file_dedup="redis" 时生效 + @param task_table_type: 任务表类型 支持 redis、mysql + @param task_state: mysql 中任务表的任务状态字段 + @param min_task_count: redis 中最少任务数,少于这个数量会从种子表中取任务 + @param check_task_interval: 检查是否还有任务的时间间隔 + @param task_limit: 每次从数据库中取任务的数量 + @param related_redis_key: 有关联的其他爬虫任务表(redis) + @param related_batch_record: 有关联的其他爬虫批次表(mysql) + @param task_condition: 任务条件,用于筛选任务 + @param task_order_by: 取任务时的排序条件 + @param thread_count: 线程数 + @param begin_callback: 爬虫开始回调函数 + @param end_callback: 爬虫结束回调函数 + @param delete_keys: 爬虫启动时删除的 key + @param keep_alive: 爬虫是否常驻 + @param batch_interval: 抓取时间间隔(天) + @param use_mysql: 是否使用 mysql 数据库 + --------- + """ + + super(FileSpider, self).__init__( + redis_key=redis_key, + task_table=task_table, + task_table_type=task_table_type, + task_keys=task_keys, + task_state=task_state, + min_task_count=min_task_count, + check_task_interval=check_task_interval, + task_limit=task_limit, + related_redis_key=related_redis_key, + related_batch_record=related_batch_record, + task_condition=task_condition, + task_order_by=task_order_by, + thread_count=thread_count, + begin_callback=begin_callback, + end_callback=end_callback, + delete_keys=delete_keys, + keep_alive=keep_alive, + batch_interval=batch_interval, + use_mysql=use_mysql, + **kwargs, + ) + + self._save_dir = save_dir + + if file_dedup == "redis": + dedup_table = setting.TAB_FILE_DEDUP.format(redis_key=self._redis_key) + self._file_dedup = RedisFileDedup(dedup_table, file_dedup_expire) + elif file_dedup == "mysql": + if file_dedup_expire is not None: + log.warning("file_dedup_expire仅在file_dedup='redis'时生效") + redis_namespace = re.sub(r"[^0-9a-zA-Z_]+", "_", self._redis_key).strip("_") + dedup_table = f"file_dedup_{redis_namespace}" if redis_namespace else "file_dedup_default" + self._file_dedup = MysqlFileDedup(table=dedup_table) + elif isinstance(file_dedup, FileDedup): + self._file_dedup = file_dedup + elif file_dedup is not None: + raise ValueError( + f"file_dedup参数无效: {file_dedup!r}, " + f"支持: None, 'redis', 'mysql', 或 FileDedup 实例" + ) + else: + self._file_dedup = None + + self._lua_record_and_check_sha = self._redisdb._redis.script_load( + self._LUA_RECORD_AND_CHECK + ) + + # ===================== 用户需实现/可重写的方法 ===================== + + def get_download_urls(self, task): + """ + 从 task 中获取需要下载的文件 URL 列表,用户必须实现 + @param task: 任务信息 + @return: List[str] - URL 列表 + """ + raise NotImplementedError("必须实现 get_download_urls 方法") + + def get_file_path(self, task, url, index): + """ + 返回文件保存路径/标识,用户可重写 + 本地场景: 返回本地文件路径 + 云存储场景: 返回存储标识/key + @param task: 任务信息 + @param url: 文件 URL + @param index: 文件在 URL 列表中的索引,默认实现用于避免同名文件覆盖 + @return: str + """ + parsed = urlparse(url) + raw_name = os.path.basename(unquote(parsed.path)) or "unknown" + _, ext = os.path.splitext(raw_name) + name_hash = hashlib.md5(raw_name.encode()).hexdigest() + filename = f"{index}_{name_hash}{ext}" + return os.path.join(self._save_dir, str(task.id), filename) + + def process_file(self, task_id, url, file_path, response): + """ + 处理下载的文件内容,返回文件最终存储位置。用户按需重写 + 默认实现: 流式保存到本地磁盘,返回本地路径 + 云存储场景: 重写此方法上传到 OSS/S3 等,返回云存储 URL + 注意: + - 此方法在下载失败重试时可能被多次调用,实现需保证幂等性 + - 必须返回非空字符串,返回空值会触发重试直至失败 + @param task_id: 任务 ID + @param url: 文件原始 URL + @param file_path: get_file_path 返回的路径/标识 + @param response: 下载响应 + @return: str - 文件最终存储位置(不可为空) + """ + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + return file_path + + def validate(self, request, response): + """文件下载默认校验: 4xx/5xx响应抛异常触发重试,3xx由requests自动跟随。用户可重写""" + if response and response.status_code >= 400: + raise Exception( + f"文件下载HTTP {response.status_code} url={request.url}" + ) + + def on_file_downloaded(self, task_id, url, file_path): + """ + 单个文件下载成功的回调,用户可重写 + @param task_id: 任务 ID + @param url: 文件原始 URL + @param file_path: 文件存储位置 + """ + pass + + def on_file_failed(self, task_id, url, error): + """ + 单个文件下载失败的回调,用户可重写 + @param task_id: 任务 ID + @param url: 文件原始 URL + @param error: 异常信息 + """ + pass + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + """ + 任务所有文件处理完毕的回调 + 用户应在此方法中 yield Item 写入结果表、yield self.update_task_batch() 更新任务状态 + @param task: PerfectDict - 任务对象,包含 task_keys 指定的字段 + @param result: List[str|None] - 每个文件的处理结果, + 顺序与 get_download_urls 返回的列表一致。 + 成功为文件存储位置,失败为 None。 + 任务内重复URL的结果继承首次出现的结果 + @param success_count: 成功数(含去重缓存命中) + @param fail_count: 下载失败数(重试耗尽) + @param skipped_count: 跳过数(无效URL、get_file_path异常等) + @param dup_count: 任务内重复URL数 + @param total_count: 总数(success + fail + skipped + dup = total) + """ + pass + + # ===================== 框架内部方法 ===================== + + # Lua 脚本: 原子操作 - 轮次校验 + 幂等写入结果 + 递增计数 + 设置TTL + 检查完成 + # KEYS[1]=progress_key KEYS[2]=result_key + # ARGV[1]=field("success"/"fail") ARGV[2]=file_index ARGV[3]=result_value ARGV[4]=run_id + # 返回值: {status, total, success, fail, skipped, dup} + # status: -1=key不存在或run_id不匹配(过期回调), 0=未完成, 1=首次完成 + _LUA_RECORD_AND_CHECK = """ +if redis.call('exists', KEYS[1]) == 0 then + return {-1, 0, 0, 0, 0, 0} +end +if redis.call('hget', KEYS[1], 'run_id') ~= ARGV[4] then + return {-1, 0, 0, 0, 0, 0} +end +local is_new = redis.call('hsetnx', KEYS[2], ARGV[2], ARGV[3]) +if is_new == 1 then + redis.call('hincrby', KEYS[1], ARGV[1], 1) +end +redis.call('expire', KEYS[2], 86400) +redis.call('expire', KEYS[1], 86400) +local total = tonumber(redis.call('hget', KEYS[1], 'total')) or 0 +local success = tonumber(redis.call('hget', KEYS[1], 'success')) or 0 +local fail = tonumber(redis.call('hget', KEYS[1], 'fail')) or 0 +local skipped = tonumber(redis.call('hget', KEYS[1], 'skipped')) or 0 +local dup = tonumber(redis.call('hget', KEYS[1], 'dup')) or 0 +if success + fail + skipped + dup >= total and total > 0 then + local done = redis.call('hsetnx', KEYS[1], 'done', 1) + if done == 1 then + return {1, total, success, fail, skipped, dup} + end +end +return {0, total, success, fail, skipped, dup} +""" + + def _record_and_check_done(self, progress_key, result_key, field, file_index, result_value, run_id): + """原子操作: 轮次校验 + 幂等写入结果 + 递增计数 + 检查完成 + run_id 不匹配时视为过期回调直接丢弃,防止跨轮次数据污染。 + 同一 file_index 仅首次写入时递增计数器。 + @return: (status, total, success, fail, skipped, dup) + status: -1=key不存在或run_id不匹配(过期回调), 0=未完成, 1=首次完成 + """ + try: + result = self._redisdb._redis.evalsha( + self._lua_record_and_check_sha, 2, + progress_key, result_key, field, file_index, result_value, run_id, + ) + except NoScriptError: + self._lua_record_and_check_sha = self._redisdb._redis.script_load( + self._LUA_RECORD_AND_CHECK + ) + result = self._redisdb._redis.evalsha( + self._lua_record_and_check_sha, 2, + progress_key, result_key, field, file_index, result_value, run_id, + ) + return result[0], result[1], result[2], result[3], result[4], result[5] + + def start_requests(self, task): + """ + 遍历 URL 列表生成下载请求。 + - 任务内重复 URL 自动去重,结果继承首次出现的下载结果 + - 跨任务去重缓存命中的 URL 直接复用结果,不生成 Request + - 先在本地收集所有结果,通过 pipeline 一次性写入 Redis, + 再 yield Request,避免 worker 线程与初始化之间的竞态 + """ + urls = self.get_download_urls(task) + if isinstance(urls, str): + raise TypeError(f"get_download_urls应返回列表, 实际返回了字符串: {urls[:100]}") + if not urls: + log.warning(f"任务{task.id}无下载URL") + try: + for item in self.on_task_all_done(task, [], 0, 0, 0, 0, 0) or []: + yield item + except Exception as e: + log.error(f"任务{task.id} on_task_all_done异常 error={e}") + log.warning(f"任务{task.id} 状态未更新, 请检查on_task_all_done实现") + return + + total = len(urls) + task_id = task.id + progress_key = setting.TAB_FILE_PROGRESS.format( + redis_key=self._redis_key, task_id=task_id + ) + result_key = setting.TAB_FILE_RESULT.format( + redis_key=self._redis_key, task_id=task_id + ) + + run_id = os.urandom(8).hex() + + cached_count = 0 + skipped_count = 0 + dup_count = 0 + result_mapping = {} + dup_to_source = {} + seen_urls = {} + pending_requests = [] + + for index, url in enumerate(urls): + if not url or not isinstance(url, str) or not url.strip(): + result_mapping[str(index)] = "" + skipped_count += 1 + log.warning(f"任务{task_id} 跳过无效URL index={index}") + continue + + url = url.strip() + if url in seen_urls: + dup_to_source[index] = seen_urls[url] + dup_count += 1 + log.debug(f"任务{task_id} URL任务内去重 index={index} -> {seen_urls[url]}") + continue + seen_urls[url] = index + + if self._file_dedup: + try: + cached_result = self._file_dedup.get(url) + except Exception as e: + log.error(f"任务{task_id} 去重缓存查询异常 url={url} error={e}") + cached_result = None + if cached_result is not None: + result_mapping[str(index)] = cached_result + cached_count += 1 + log.debug(f"任务{task_id} 文件去重命中 url={url}") + try: + self.on_file_downloaded(task_id, url, cached_result) + except Exception as e: + log.error(f"任务{task_id} on_file_downloaded回调异常 url={url} error={e}") + continue + + try: + file_path = self.get_file_path(task, url, index) + except Exception as e: + result_mapping[str(index)] = "" + skipped_count += 1 + log.error(f"任务{task_id} get_file_path异常 url={url} error={e}") + continue + + pending_requests.append( + Request( + url, + task_id=task_id, + file_index=index, + file_path=file_path, + task=task, + run_id=run_id, + callback=self.save_file, + ) + ) + + # 清理旧 key 并通过 pipeline 原子写入初始状态 + dup_key = setting.TAB_FILE_DUP.format( + redis_key=self._redis_key, task_id=task_id + ) + pipe = self._redisdb._redis.pipeline() + pipe.delete(progress_key) + pipe.delete(result_key) + pipe.delete(dup_key) + progress_fields = { + "total": total, "success": cached_count, + "fail": 0, "skipped": skipped_count, "dup": dup_count, + "run_id": run_id, + } + for field, value in progress_fields.items(): + pipe.hset(progress_key, field, value) + pipe.expire(progress_key, 86400) + if result_mapping: + for field, value in result_mapping.items(): + pipe.hset(result_key, field, value) + pipe.expire(result_key, 86400) + if dup_to_source: + for dup_idx, src_idx in dup_to_source.items(): + pipe.hset(dup_key, str(dup_idx), str(src_idx)) + pipe.expire(dup_key, 86400) + pipe.execute() + + if dup_count > 0: + log.info(f"任务{task_id} 任务内URL去重{dup_count}个") + if cached_count > 0: + log.info(f"任务{task_id} 去重缓存命中{cached_count}/{total}个文件") + + # 全部命中缓存/跳过/去重,直接触发 on_task_all_done + if cached_count + skipped_count + dup_count >= total: + try: + result = self._assemble_results(task_id, total) + for item in self.on_task_all_done( + task, result, cached_count, 0, skipped_count, dup_count, total + ) or []: + yield item + except Exception as e: + log.error(f"任务{task_id} on_task_all_done异常 error={e}") + log.warning(f"任务{task_id} 状态未更新, 请检查on_task_all_done实现") + finally: + yield lambda: self._cleanup_task_redis(task_id) + return + + # Redis 状态就绪后再下发请求 + for request in pending_requests: + yield request + + def save_file(self, request, response): + """ + 框架内部回调,处理文件保存和进度追踪。用户不应重写此方法。 + """ + task_id = request.task_id + file_index = request.file_index + url = request.url + file_path = request.file_path + run_id = getattr(request, "run_id", "") + + try: + result_url = self.process_file(task_id, url, file_path, response) + except Exception as e: + log.error(f"任务{task_id} process_file异常 url={url} error={e}") + raise + + if not result_url: + raise Exception(f"process_file返回空值 url={url}, 请检查实现是否正确返回了文件存储位置") + + # 写入去重缓存(异常不影响主流程) + if self._file_dedup and result_url: + try: + self._file_dedup.set(url, result_url) + except Exception as e: + log.error(f"任务{task_id} 去重缓存写入异常 url={url} error={e}") + + # 原子操作: 轮次校验 + 幂等写入结果 + 递增计数 + 检查完成 + progress_key = setting.TAB_FILE_PROGRESS.format( + redis_key=self._redis_key, task_id=task_id + ) + result_key = setting.TAB_FILE_RESULT.format( + redis_key=self._redis_key, task_id=task_id + ) + status, total, success, fail, skipped, dup = self._record_and_check_done( + progress_key, result_key, "success", str(file_index), result_url or "", run_id, + ) + + if status == -1: + log.debug(f"任务{task_id} 过期回调已丢弃 url={url}") + return + + log.info(f"任务{task_id} 文件下载成功 [{success + fail + skipped + dup}/{total}] url={url}") + + try: + self.on_file_downloaded(task_id, url, result_url) + except Exception as e: + log.error(f"任务{task_id} on_file_downloaded回调异常 url={url} error={e}") + + if status == 1: + task = request.task + try: + result = self._assemble_results(task_id, total) + for item in self.on_task_all_done( + task, result, success, fail, skipped, dup, total + ) or []: + yield item + except Exception as e: + log.error(f"任务{task_id} on_task_all_done异常 error={e}") + log.warning(f"任务{task_id} 状态未更新, 请检查on_task_all_done实现") + finally: + yield lambda: self._cleanup_task_redis(task_id) + + def failed_request(self, request, response, e): + """ + 文件下载失败(重试耗尽)的处理。 + """ + task_id = getattr(request, "task_id", None) + file_index = getattr(request, "file_index", None) + + if task_id is None or file_index is None: + yield request + return + + run_id = getattr(request, "run_id", "") + + # 原子操作: 轮次校验 + 幂等写入结果 + 递增计数 + 检查完成 + progress_key = setting.TAB_FILE_PROGRESS.format( + redis_key=self._redis_key, task_id=task_id + ) + result_key = setting.TAB_FILE_RESULT.format( + redis_key=self._redis_key, task_id=task_id + ) + status, total, success, fail, skipped, dup = self._record_and_check_done( + progress_key, result_key, "fail", str(file_index), "", run_id, + ) + + if status == -1: + log.debug(f"任务{task_id} 过期回调已丢弃 url={request.url}") + return + + log.error(f"任务{task_id} 文件下载失败 [{success + fail + skipped + dup}/{total}] url={request.url}") + + try: + self.on_file_failed(task_id, request.url, e) + except Exception as e_cb: + log.error(f"任务{task_id} on_file_failed回调异常 url={request.url} error={e_cb}") + + if status == 1: + task = request.task + try: + result = self._assemble_results(task_id, total) + for item in self.on_task_all_done( + task, result, success, fail, skipped, dup, total + ) or []: + yield item + except Exception as e_done: + log.error(f"任务{task_id} on_task_all_done异常 error={e_done}") + log.warning(f"任务{task_id} 状态未更新, 请检查on_task_all_done实现") + finally: + yield lambda: self._cleanup_task_redis(task_id) + + yield request + + def _assemble_results(self, task_id, total): + """ + 从 Redis 中拉取文件处理结果和任务内重复映射, + 按 0~total-1 顺序组装为有序列表,重复索引继承首次出现的结果。 + 使用 hscan_iter 分批读取,避免超大任务时 hgetall 的内存峰值。 + """ + result_key = setting.TAB_FILE_RESULT.format( + redis_key=self._redis_key, task_id=task_id + ) + all_data = {} + for k, v in self._redisdb._redis.hscan_iter(result_key, count=1000): + key = k.decode() if isinstance(k, bytes) else k + val = v.decode() if isinstance(v, bytes) else v + all_data[key] = val + result = [all_data.get(str(i)) or None for i in range(total)] + + dup_key = setting.TAB_FILE_DUP.format( + redis_key=self._redis_key, task_id=task_id + ) + for dup_idx_raw, src_idx_raw in self._redisdb._redis.hscan_iter(dup_key, count=1000): + dup_idx = int(dup_idx_raw.decode() if isinstance(dup_idx_raw, bytes) else dup_idx_raw) + src_idx = int(src_idx_raw.decode() if isinstance(src_idx_raw, bytes) else src_idx_raw) + result[dup_idx] = result[src_idx] + + return result + + def _cleanup_task_redis(self, task_id): + """清理任务相关的 Redis 进度、结果和重复映射 key""" + progress_key = setting.TAB_FILE_PROGRESS.format( + redis_key=self._redis_key, task_id=task_id + ) + result_key = setting.TAB_FILE_RESULT.format( + redis_key=self._redis_key, task_id=task_id + ) + dup_key = setting.TAB_FILE_DUP.format( + redis_key=self._redis_key, task_id=task_id + ) + self._redisdb.clear(progress_key) + self._redisdb.clear(result_key) + self._redisdb.clear(dup_key) + + def close(self): + """释放文件去重缓存资源""" + if self._file_dedup: + try: + self._file_dedup.close() + except Exception as e: + log.error(f"文件去重缓存关闭异常 error={e}") + + @classmethod + def to_DebugFileSpider(cls, *args, **kwargs): + DebugFileSpider.__bases__ = (cls,) + DebugFileSpider.__name__ = cls.__name__ + return DebugFileSpider(*args, **kwargs) + + +class DebugFileSpider(FileSpider): + """ + Debug 文件下载爬虫 + """ + + __debug_custom_setting__ = dict( + COLLECTOR_TASK_COUNT=1, + SPIDER_THREAD_COUNT=1, + SPIDER_SLEEP_TIME=0, + SPIDER_MAX_RETRY_TIMES=10, + REQUEST_LOST_TIMEOUT=600, + PROXY_ENABLE=False, + RETRY_FAILED_REQUESTS=False, + SAVE_FAILED_REQUEST=False, + ITEM_FILTER_ENABLE=False, + REQUEST_FILTER_ENABLE=False, + OSS_UPLOAD_TABLES=(), + DELETE_KEYS=True, + ) + + def __init__( + self, + task_id=None, + task=None, + save_to_db=False, + update_task=False, + *args, + **kwargs, + ): + """ + @param task_id: 任务 id + @param task: 任务,task 与 task_id 二者选一即可。如 task = {"url":""} + @param save_to_db: 数据是否入库,默认否 + @param update_task: 是否更新任务,默认否 + """ + warnings.warn( + "您正处于debug模式下,该模式下不会更新任务状态及数据入库,仅用于调试。" + "正式发布前请更改为正常模式", + category=Warning, + ) + + if not task and not task_id: + raise Exception("task_id 与 task 不能同时为空") + + kwargs["redis_key"] = kwargs["redis_key"] + "_debug" + if not save_to_db: + self.__class__.__debug_custom_setting__["ITEM_PIPELINES"] = [ + CONSOLE_PIPELINE_PATH + ] + self.__class__.__custom_setting__.update( + self.__class__.__debug_custom_setting__ + ) + + super(DebugFileSpider, self).__init__(*args, **kwargs) + + self._task_id = task_id + self._task = task + self._update_task = update_task + + def start_monitor_task(self): + if not self._parsers: + self._is_more_parsers = False + self._parsers.append(self) + elif len(self._parsers) <= 1: + self._is_more_parsers = False + + if self._task: + self.distribute_task([self._task]) + else: + tasks = self.get_todo_task_from_mysql() + if not tasks: + raise Exception( + f"未获取到任务 请检查 task_id: {self._task_id} 是否存在" + ) + self.distribute_task(tasks) + + log.debug("下发任务完毕") + + def get_todo_task_from_mysql(self): + task_keys = ", ".join([f"`{key}`" for key in self._task_keys]) + sql = "select %s from %s where id=%s" % ( + task_keys, + self._task_table, + self._task_id, + ) + tasks = self._mysqldb.find(sql) + return tasks + + def save_cached(self, request, response, table): + pass + + def update_task_state(self, task_id, state=1, *args, **kwargs): + if self._update_task: + kwargs["id"] = task_id + kwargs[self._task_state] = state + + sql = tools.make_update_sql( + self._task_table, + kwargs, + condition=f"id = {task_id}", + ) + + if self._mysqldb.update(sql): + log.debug(f"置任务{task_id}状态成功") + else: + log.error(f"置任务{task_id}状态失败 sql={sql}") + + def update_task_batch(self, task_id, state=1, *args, **kwargs): + if self._update_task: + kwargs["id"] = task_id + kwargs[self._task_state] = state + + update_item = UpdateItem(**kwargs) + update_item.table_name = self._task_table + update_item.name_underline = self._task_table + "_item" + + return update_item + + def run(self): + self.start_monitor_task() + + if not self._parsers: + self._parsers.append(self) + + self._start() + + while True: + try: + if self.all_thread_is_done(): + self._stop_all_thread() + break + except Exception as e: + log.exception(e) + + tools.delay_time(1) + + self.delete_tables([self._redis_key + "*"]) diff --git a/feapder/db/redisdb.py b/feapder/db/redisdb.py index d882e687..97b7d943 100644 --- a/feapder/db/redisdb.py +++ b/feapder/db/redisdb.py @@ -137,15 +137,14 @@ def get_connect(self): else self._ip_ports.split(",") ) if len(ip_ports) > 1: - startup_nodes = [] + parsed_nodes = [] for ip_port in ip_ports: ip, port = ip_port.split(":") - startup_nodes.append({"host": ip, "port": port}) + parsed_nodes.append((ip, int(port))) if self._service_name: # log.debug("使用redis哨兵模式") - hosts = [(node["host"], node["port"]) for node in startup_nodes] - sentinel = Sentinel(hosts, socket_timeout=3, **self._kwargs) + sentinel = Sentinel(parsed_nodes, socket_timeout=3, **self._kwargs) self._redis = sentinel.master_for( self._service_name, password=self._user_pass, @@ -158,10 +157,17 @@ def get_connect(self): else: try: - from rediscluster import RedisCluster - except ModuleNotFoundError as e: - log.error('请安装 pip install "feapder[all]"') - os._exit(0) + from redis.cluster import RedisCluster, ClusterNode + startup_nodes = [ClusterNode(host=ip, port=port) for ip, port in parsed_nodes] + except ModuleNotFoundError: + try: + from rediscluster import RedisCluster + startup_nodes = [{"host": ip, "port": port} for ip, port in parsed_nodes] + except ModuleNotFoundError: + log.error( + '请安装 pip install "feapder[all]",或升级 redis>=4.0,或安装 redis-py-cluster' + ) + os._exit(0) # log.debug("使用redis集群模式") self._redis = RedisCluster( diff --git a/feapder/dedup/file_dedup.py b/feapder/dedup/file_dedup.py new file mode 100644 index 00000000..5616cad2 --- /dev/null +++ b/feapder/dedup/file_dedup.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +""" +文件去重缓存 + +与现有 Dedup(布隆过滤器等,只判断存在性)不同, +FileDedup 存储 URL -> result_url 的完整映射,用于直接复用下载结果。 +""" + +import hashlib + +from feapder.db.mysqldb import MysqlDB +from feapder.db.redisdb import RedisDB +from feapder.utils.log import log + + +class FileDedup: + """文件去重缓存接口 + + 用于存储和检索文件下载结果的缓存。 + 子类需实现 get / set 方法。 + """ + + def get(self, url): + """获取 URL 对应的缓存结果 + + Args: + url: 文件原始 URL + + Returns: + str or None: 缓存的文件存储位置,无缓存返回 None + """ + return None + + def set(self, url, result_url): + """缓存 URL 的处理结果 + + Args: + url: 文件原始 URL + result_url: 文件最终存储位置(本地路径或云存储 URL) + """ + pass + + def close(self): + pass + + +class RedisFileDedup(FileDedup): + """基于 Redis Hash 的文件去重 + + 适合分布式场景,多进程共享。 + """ + + def __init__(self, table, expire_time=None): + """ + Args: + table: Redis Hash 的 key + expire_time: 过期时间(秒),None 表示不过期 + """ + self._redisdb = RedisDB() + self._table = table + self._expire_time = expire_time + + def get(self, url): + result = self._redisdb.hget(self._table, url) + if result is None: + return None + if isinstance(result, bytes): + result = result.decode() + return result or None + + def set(self, url, result_url): + self._redisdb.hset(self._table, url, result_url) + if self._expire_time: + self._redisdb._redis.expire(self._table, self._expire_time) + + +class MysqlFileDedup(FileDedup): + """基于 MySQL 表的文件去重 + + 持久化可靠,适合长期缓存。 + 首次使用时会自动建表。 + """ + + _table_ensured = set() + + def __init__(self, table="file_dedup", mysqldb=None): + """ + Args: + table: MySQL 表名 + mysqldb: MysqlDB 实例,默认使用全局配置 + """ + self._mysqldb = mysqldb or MysqlDB() + self._table = table + self._ensure_table() + + def _ensure_table(self): + if self._table in self.__class__._table_ensured: + return + sql = ( + f"CREATE TABLE IF NOT EXISTS `{self._table}` (" + f" `id` int(11) NOT NULL AUTO_INCREMENT," + f" `url` text NOT NULL COMMENT '文件原始URL'," + f" `url_hash` char(32) NOT NULL COMMENT 'URL的MD5哈希'," + f" `result_url` text COMMENT '文件存储位置'," + f" `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP," + f" PRIMARY KEY (`id`)," + f" UNIQUE KEY `uk_url_hash` (`url_hash`) USING BTREE" + f") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4" + ) + self._mysqldb.execute(sql) + self.__class__._table_ensured.add(self._table) + + @staticmethod + def _hash_url(url): + return hashlib.md5(url.encode()).hexdigest() + + def get(self, url): + url_hash = self._hash_url(url) + sql = f"SELECT result_url FROM `{self._table}` WHERE `url_hash` = %s LIMIT 1" + result = self._mysqldb.find(sql, (url_hash,)) + return result[0][0] if result else None + + def set(self, url, result_url): + url_hash = self._hash_url(url) + sql = ( + f"INSERT INTO `{self._table}` (`url`, `url_hash`, `result_url`) VALUES (%s, %s, %s) " + f"ON DUPLICATE KEY UPDATE `result_url` = VALUES(`result_url`)" + ) + self._mysqldb.execute(sql, (url, url_hash, result_url)) diff --git a/feapder/requirements.txt b/feapder/requirements.txt index 21717674..a882ee80 100644 --- a/feapder/requirements.txt +++ b/feapder/requirements.txt @@ -4,13 +4,12 @@ parsel>=1.5.2 PyExecJS>=1.5.1 pymongo>=3.10.1 PyMySQL>=0.9.3 -redis>=2.10.6,<4.0.0 +redis>=2.10.6,<6.0.0 requests>=2.22.0 selenium>=3.141.0 bs4>=0.0.1 ipython>=7.14.0 bitarray>=1.5.3 -redis-py-cluster>=2.1.0 cryptography>=3.3.2 urllib3>=1.25.8 loguru>=0.5.3 @@ -18,4 +17,4 @@ influxdb>=5.3.1 pyperclip>=1.8.2 webdriver-manager>=4.0.0 terminal-layout>=2.1.3 -playwright \ No newline at end of file +playwright diff --git a/feapder/setting.py b/feapder/setting.py index c52b318c..4b5b6747 100644 --- a/feapder/setting.py +++ b/feapder/setting.py @@ -11,6 +11,14 @@ TAB_FAILED_ITEMS = "{redis_key}:s_failed_items" # 爬虫状态表模版 TAB_SPIDER_STATUS = "{redis_key}:h_spider_status" +# 文件爬虫 - 进度追踪 +TAB_FILE_PROGRESS = "{redis_key}:h_file_progress:{task_id}" +# 文件爬虫 - 文件结果 +TAB_FILE_RESULT = "{redis_key}:h_file_result:{task_id}" +# 文件爬虫 - 任务内重复URL映射 +TAB_FILE_DUP = "{redis_key}:h_file_dup:{task_id}" +# 文件爬虫 - 去重缓存 +TAB_FILE_DEDUP = "{redis_key}:h_file_dedup" # 用户池 TAB_USER_POOL = "{redis_key}:h_{user_type}_pool" @@ -193,7 +201,8 @@ # 时间间隔 WARNING_INTERVAL = 3600 # 相同报警的报警时间间隔,防止刷屏; 0表示不去重 WARNING_LEVEL = "DEBUG" # 报警级别, DEBUG / INFO / ERROR -WARNING_FAILED_COUNT = 1000 # 任务失败数 超过WARNING_FAILED_COUNT则报警 +WARNING_FAILED_COUNT = 1000 # 失败请求数 超过WARNING_FAILED_COUNT则报警 +WARNING_SUCCESS_RATE = 0.5 # 请求成功率低于WARNING_SUCCESS_RATE则报警 WARNING_CHECK_TASK_COUNT_INTERVAL = 1200 # 检查已做任务数量的时间间隔,若两次时间间隔之间,任务数无变化则报警 # 日志 diff --git a/feapder/templates/file_spider_template.tmpl b/feapder/templates/file_spider_template.tmpl new file mode 100644 index 00000000..4c261233 --- /dev/null +++ b/feapder/templates/file_spider_template.tmpl @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +""" +Created on {DATE} +--------- +@summary: +--------- +@author: {USER} +""" + +import json + +import feapder +from feapder import ArgumentParser + + +class ${spider_name}(feapder.FileSpider): + # 自定义数据库,若项目中有setting.py文件,此自定义可删除 + __custom_setting__ = dict( + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + MYSQL_IP="localhost", + MYSQL_PORT=3306, + MYSQL_DB="", + MYSQL_USER_NAME="", + MYSQL_USER_PASS="", + ) + + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + # 任务状态需在此显式更新;实现需保证幂等,异常可能触发任务重试 + # fail_count == 0 且有实际成功下载则标记完成;全部跳过或无有效URL标记失败 + if fail_count == 0 and success_count > 0: + yield self.update_task_batch(task.id, 1) + else: + yield self.update_task_batch(task.id, -1) + + +if __name__ == "__main__": + spider = ${spider_name}( + redis_key="xxx:xxx", # 分布式爬虫调度信息存储位置 + task_table="", # mysql中的任务表 + task_keys=["id", "file_urls"], # 需要获取任务表里的字段名,可添加多个 + task_state="state", # mysql中任务状态字段 + save_dir="./downloads", # 文件保存根目录 + # file_dedup="redis", # 跨任务去重策略: None / "redis" / "mysql" + ) + + parser = ArgumentParser(description="${spider_name}爬虫") + + parser.add_argument( + "--start_master", + action="store_true", + help="添加任务", + function=spider.start_monitor_task, + ) + parser.add_argument( + "--start_worker", action="store_true", help="启动爬虫", function=spider.start + ) + + parser.start() + + # 启动方式一:单进程(调试方便) + # spider.start() + + # 启动方式二:分离 master/worker(生产推荐) + # uv run ${file_name} --start_master # 仅负责派发任务 + # uv run ${file_name} --start_worker # 仅负责消费下载 diff --git a/feapder/templates/project_template/setting.py b/feapder/templates/project_template/setting.py index 140aaa07..890ec9d1 100644 --- a/feapder/templates/project_template/setting.py +++ b/feapder/templates/project_template/setting.py @@ -175,7 +175,8 @@ # # 时间间隔 # WARNING_INTERVAL = 3600 # 相同报警的报警时间间隔,防止刷屏; 0表示不去重 # WARNING_LEVEL = "DEBUG" # 报警级别, DEBUG / INFO / ERROR -# WARNING_FAILED_COUNT = 1000 # 任务失败数 超过WARNING_FAILED_COUNT则报警 +# WARNING_FAILED_COUNT = 1000 # 失败请求数 超过WARNING_FAILED_COUNT则报警 +# WARNING_SUCCESS_RATE = 0.5 # 请求成功率低于WARNING_SUCCESS_RATE则报警 # # LOG_NAME = os.path.basename(os.getcwd()) # LOG_PATH = "log/%s.log" % LOG_NAME # log存储路径 diff --git a/setup.py b/setup.py index cf4fe542..70baf38c 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ "DBUtils>=2.0", "parsel>=1.5.2", "PyMySQL>=0.9.3", - "redis>=2.10.6,<4.0.0", + "redis>=2.10.6,<6.0.0", "requests>=2.22.0", "bs4>=0.0.1", "ipython>=7.14.0", @@ -60,7 +60,6 @@ "bitarray>=1.5.3", "PyExecJS>=1.5.1", "pymongo>=3.10.1", - "redis-py-cluster>=2.1.0", ] + render_requires setuptools.setup( diff --git a/tests/file-spider/table.sql b/tests/file-spider/table.sql new file mode 100644 index 00000000..772d4e39 --- /dev/null +++ b/tests/file-spider/table.sql @@ -0,0 +1,20 @@ +-- FileSpider 任务表 +CREATE TABLE IF NOT EXISTS `file_task` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `file_urls` text COMMENT '待下载文件URL列表,JSON数组格式', + `state` int(11) DEFAULT 0 COMMENT '任务状态: 0待做 2下载中 1完成 -1失败', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- 结果表(场景三使用) +CREATE TABLE IF NOT EXISTS `file_result` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `task_id` int(11) DEFAULT NULL COMMENT '任务ID', + `result_urls` text COMMENT '文件存储位置列表,JSON数组,与file_urls位置对应', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- 示例数据 +INSERT INTO `file_task` (`file_urls`, `state`) VALUES +('["https://httpbin.org/image/png", "https://httpbin.org/image/jpeg"]', 0), +('["https://httpbin.org/image/svg", "https://httpbin.org/image/webp", "https://httpbin.org/image/png"]', 0); diff --git a/tests/file-spider/test_dedup_file_spider.py b/tests/file-spider/test_dedup_file_spider.py new file mode 100644 index 00000000..ca64ab5f --- /dev/null +++ b/tests/file-spider/test_dedup_file_spider.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +""" +场景四:启用文件去重 + +通过 file_dedup 参数启用去重,同一 URL 跨任务不重复下载。 + +去重行为: +- start_requests 中遍历 URL 列表时,先查去重缓存 +- 缓存命中:直接复用已有结果,不生成 Request,不重复下载 +- 缓存未命中:正常下载,成功后自动写入去重缓存 +- 跨任务共享:不同任务中出现的相同 URL 只下载一次 +""" + +import json + +import feapder +from feapder.utils.log import log + + +class DedupFileSpider(feapder.FileSpider): + __custom_setting__ = dict( + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + MYSQL_IP="localhost", + MYSQL_PORT=3306, + MYSQL_DB="feapder", + MYSQL_USER_NAME="feapder", + MYSQL_USER_PASS="feapder123", + ) + + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def on_file_downloaded(self, task_id, url, file_path): + log.info(f"任务{task_id} 文件就绪 path={file_path}") + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + log.info(f"任务{task.id} 完成 成功={success_count} 失败={fail_count}") + yield self.update_task_batch(task.id, 1 if fail_count == 0 and success_count > 0 else -1) + + +if __name__ == "__main__": + spider = DedupFileSpider( + redis_key="dedup_file_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + save_dir="./downloads", + file_dedup="redis", # "redis" / "mysql" / FileDedup 实例 + ) + spider.start() diff --git a/tests/file-spider/test_local_file_spider.py b/tests/file-spider/test_local_file_spider.py new file mode 100644 index 00000000..3c2a4127 --- /dev/null +++ b/tests/file-spider/test_local_file_spider.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +""" +场景一:保存到本地磁盘 + +最简单的用法,下载文件保存到本地。 +任务表结构见 table.sql +""" + +import json + +import feapder +from feapder.utils.log import log + + +class LocalFileSpider(feapder.FileSpider): + __custom_setting__ = dict( + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + MYSQL_IP="localhost", + MYSQL_PORT=3306, + MYSQL_DB="feapder", + MYSQL_USER_NAME="feapder", + MYSQL_USER_PASS="feapder123", + ) + + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def on_file_downloaded(self, task_id, url, file_path): + log.info(f"任务{task_id} 文件保存成功 path={file_path}") + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + if fail_count == 0 and success_count > 0: + yield self.update_task_batch(task.id, 1) + else: + yield self.update_task_batch(task.id, -1) + + +if __name__ == "__main__": + spider = LocalFileSpider( + redis_key="local_file_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + save_dir="./downloads", + ) + spider.start() diff --git a/tests/file-spider/test_oss_file_spider.py b/tests/file-spider/test_oss_file_spider.py new file mode 100644 index 00000000..9811d9cc --- /dev/null +++ b/tests/file-spider/test_oss_file_spider.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +""" +场景二:上传云存储(不落盘) + +重写 process_file 实现直接上传云存储,文件不保存到本地磁盘。 +""" + +import json +import os +from urllib.parse import urlparse, unquote + +import feapder +from feapder.utils.log import log + + +class OssFileSpider(feapder.FileSpider): + __custom_setting__ = dict( + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + MYSQL_IP="localhost", + MYSQL_PORT=3306, + MYSQL_DB="feapder", + MYSQL_USER_NAME="feapder", + MYSQL_USER_PASS="feapder123", + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # 初始化云存储客户端 + # self.oss_client = OSSClient(bucket="my-bucket") + + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def get_file_path(self, task, url, index): + """返回 OSS 存储 key(不是本地路径)""" + filename = os.path.basename(unquote(urlparse(url).path)) + return f"files/{task.id}/{index}_{filename}" + + def process_file(self, task_id, url, file_path, response): + """上传到 OSS,返回云存储 URL""" + # self.oss_client.put_object(file_path, response.content) + cloud_url = f"https://my-bucket.oss.aliyuncs.com/{file_path}" + log.info(f"任务{task_id} 上传成功 url={cloud_url}") + return cloud_url + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + log.info(f"任务{task.id} 完成 成功={success_count} 失败={fail_count}") + if fail_count == 0 and success_count > 0: + yield self.update_task_batch(task.id, 1) + else: + yield self.update_task_batch(task.id, -1) + + +if __name__ == "__main__": + spider = OssFileSpider( + redis_key="oss_file_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + ) + spider.start() diff --git a/tests/file-spider/test_oss_result_spider.py b/tests/file-spider/test_oss_result_spider.py new file mode 100644 index 00000000..d8a91838 --- /dev/null +++ b/tests/file-spider/test_oss_result_spider.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +""" +场景三:上传云存储 + 结果入库 + +下载文件上传到云存储后,将有序的云存储 URL 列表组装成 Item 写入结果表。 + +使用前先创建结果 Item: + feapder create -i file_result + +然后编辑 items/file_result_item.py 添加 task_id、result_urls 字段。 +""" + +import json +import os +from urllib.parse import urlparse, unquote + +import feapder +from feapder import ArgumentParser +from feapder.network.item import Item +from feapder.utils.log import log + + +class FileResultItem(Item): + """ + 结果表 Item(实际项目中应通过 feapder create -i 生成) + 对应的 MySQL 表: + CREATE TABLE `file_result` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `task_id` int(11) DEFAULT NULL, + `result_urls` text COMMENT '云存储URL列表,JSON数组', + PRIMARY KEY (`id`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.table_name = "file_result" + self.task_id = None + self.result_urls = None + + +class OssResultSpider(feapder.FileSpider): + __custom_setting__ = dict( + REDISDB_IP_PORTS="localhost:6379", + REDISDB_USER_PASS="", + REDISDB_DB=0, + MYSQL_IP="localhost", + MYSQL_PORT=3306, + MYSQL_DB="feapder", + MYSQL_USER_NAME="feapder", + MYSQL_USER_PASS="feapder123", + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # self.oss_client = OSSClient(bucket="my-bucket") + + def get_download_urls(self, task): + return json.loads(task.file_urls) + + def get_file_path(self, task, url, index): + filename = os.path.basename(unquote(urlparse(url).path)) + return f"files/{task.id}/{index}_{filename}" + + def process_file(self, task_id, url, file_path, response): + # self.oss_client.put_object(file_path, response.content) + return f"https://my-bucket.oss.aliyuncs.com/{file_path}" + + def on_task_all_done(self, task, result, success_count, fail_count, skipped_count, dup_count, total_count): + # result 与 get_download_urls 返回的列表严格位置对应 + # 例: ["https://oss.com/a.jpg", "https://oss.com/b.jpg", None, "https://oss.com/d.jpg"] + log.info( + f"任务{task.id} 完成 成功={success_count} 失败={fail_count} " + f"跳过={skipped_count} 去重={dup_count}" + ) + + # 组装结果 Item 写入结果表 + item = FileResultItem() + item.task_id = task.id + item.result_urls = result + yield item + + # 更新任务状态 + if fail_count == 0: + yield self.update_task_batch(task.id, 1) + else: + yield self.update_task_batch(task.id, -1) + + +if __name__ == "__main__": + spider = OssResultSpider( + redis_key="oss_result_spider", + task_table="file_task", + task_keys=["id", "file_urls"], + ) + + parser = ArgumentParser(description="OssResultSpider 文件下载爬虫") + parser.add_argument( + "--start_master", + action="store_true", + help="添加任务", + function=spider.start_monitor_task, + ) + parser.add_argument( + "--start_worker", + action="store_true", + help="启动爬虫", + function=spider.start, + ) + parser.start()