From a6aef3d21e47483644fa22b549135e0c2e85d666 Mon Sep 17 00:00:00 2001 From: android-dev1 Date: Sat, 11 Apr 2026 14:27:11 +0800 Subject: [PATCH] perf: optimize email fetching with SSE streaming, channel cache, and concurrent IMAP - Skip Graph API for accounts without Mail.Read permission (check token scope) - Cache account channel (graph/imap) in memory with 1h TTL to avoid repeated probing - Add server-side email list cache with 2h TTL to eliminate redundant fetches - SSE streaming endpoint (/api/emails//stream) for progressive email rendering - Concurrent IMAP: try both outlook.live.com and outlook.office365.com simultaneously - Batch IMAP FETCH: single request for all messages instead of N sequential fetches - Gunicorn: switch to gthread worker with 8 threads for concurrent request handling - Fix cached method name matching (use includes() instead of strict equality) Co-Authored-By: Claude Opus 4.6 (1M context) --- Dockerfile | 6 +- outlook_web/controllers/emails.py | 310 +++++++++++++++++--------- outlook_web/routes/emails.py | 5 + outlook_web/services/channel_cache.py | 43 ++++ outlook_web/services/email_cache.py | 67 ++++++ outlook_web/services/graph.py | 21 ++ outlook_web/services/imap.py | 309 ++++++++++++++++++++++++- static/js/features/emails.js | 134 ++++++----- 8 files changed, 728 insertions(+), 167 deletions(-) create mode 100644 outlook_web/services/channel_cache.py create mode 100644 outlook_web/services/email_cache.py diff --git a/Dockerfile b/Dockerfile index 44b63dc..bbfd9b0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,6 +29,6 @@ EXPOSE 5000 # 健康检查 HEALTHCHECK --interval=30s --timeout=5s --start-period=20s CMD ["python","-c","import urllib.request as u; u.urlopen('http://localhost:5000/healthz', timeout=4).read()"] -# 启动应用(使用 Gunicorn,单 worker 避免 session 共享问题) -# 注意:禁用 --preload,避免在 master 进程中启动后台调度线程 -CMD ["gunicorn", "-w", "1", "-b", "0.0.0.0:5000", "--timeout", "120", "--access-logfile", "-", "web_outlook_app:app"] +# 启动应用(单 worker + 多线程:保持调度器单实例,同时支持并发请求处理) +# 瓶颈是网络 I/O(Graph API / IMAP),GIL 在 I/O 期间释放,线程并发有效 +CMD ["gunicorn", "-w", "1", "--threads", "8", "-b", "0.0.0.0:5000", "--timeout", "120", "--access-logfile", "-", "web_outlook_app:app"] diff --git a/outlook_web/controllers/emails.py b/outlook_web/controllers/emails.py index 5b3be0c..f2c484f 100644 --- a/outlook_web/controllers/emails.py +++ b/outlook_web/controllers/emails.py @@ -3,7 +3,9 @@ import logging from typing import Any -from flask import jsonify, request +import json + +from flask import Response, jsonify, request, stream_with_context from outlook_web import config from outlook_web.audit import log_audit @@ -18,6 +20,8 @@ from outlook_web.services import external_api as external_api_service from outlook_web.services import graph as graph_service from outlook_web.services import imap as imap_service +from outlook_web.services.channel_cache import get_cached_channel, set_cached_channel +from outlook_web.services.email_cache import get_cached_emails, set_cached_emails from outlook_web.services.imap_generic import ( get_email_detail_imap_generic_result, get_emails_imap_generic, @@ -116,6 +120,16 @@ def api_get_emails(email_addr: str) -> Any: ) return jsonify(result) + # 服务端缓存(2h TTL) + cached = get_cached_emails(email_addr, folder) + if cached: + return jsonify({ + "success": True, + "emails": cached["emails"], + "method": cached["method"] + " (cached)", + "has_more": cached["has_more"], + }) + # 获取分组代理设置 proxy_url = "" if account.get("group_id"): @@ -125,143 +139,126 @@ def api_get_emails(email_addr: str) -> Any: # 收集所有错误信息 all_errors = {} + graph_result = None + + # 通道缓存:已知走 IMAP 的账号直接跳过 Graph API + cached_channel = get_cached_channel(email_addr) + + # 1. 尝试 Graph API(缓存为 imap 时跳过) + if cached_channel != "imap": + graph_result = graph_service.get_emails_graph(account["client_id"], account["refresh_token"], folder, skip, top, proxy_url) + if graph_result.get("success"): + set_cached_channel(email_addr, "graph") + emails = graph_result.get("emails", []) + account_summary = compact_summary_service.update_summary_from_message_list( + int(account["id"]), + emails, + folder=folder, + ) + # 更新刷新时间,同时保存 Microsoft 可能返回的新 refresh_token(Token Rotation) + db = get_db() + new_rt = graph_result.get("new_refresh_token") + if new_rt and new_rt != account.get("refresh_token"): + from outlook_web.security.crypto import encrypt_data as _encrypt_data - # 1. 尝试 Graph API - graph_result = graph_service.get_emails_graph(account["client_id"], account["refresh_token"], folder, skip, top, proxy_url) - if graph_result.get("success"): - emails = graph_result.get("emails", []) - account_summary = compact_summary_service.update_summary_from_message_list( - int(account["id"]), - emails, - folder=folder, - ) - # 更新刷新时间,同时保存 Microsoft 可能返回的新 refresh_token(Token Rotation) - db = get_db() - new_rt = graph_result.get("new_refresh_token") - if new_rt and new_rt != account.get("refresh_token"): - from outlook_web.security.crypto import encrypt_data as _encrypt_data - - try: - db.execute( - "UPDATE accounts SET refresh_token = ?, last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE email = ?", - (_encrypt_data(new_rt), email_addr), - ) - except Exception: + try: + db.execute( + "UPDATE accounts SET refresh_token = ?, last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE email = ?", + (_encrypt_data(new_rt), email_addr), + ) + except Exception: + db.execute( + "UPDATE accounts SET last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE email = ?", + (email_addr,), + ) + else: db.execute( - "UPDATE accounts SET last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE email = ?", + """ + UPDATE accounts + SET last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP + WHERE email = ? + """, (email_addr,), ) - else: - db.execute( - """ - UPDATE accounts - SET last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP - WHERE email = ? - """, - (email_addr,), - ) - db.commit() + db.commit() + + # 格式化 Graph API 返回的数据 + formatted = [] + for e in emails: + formatted.append( + { + "id": e.get("id"), + "subject": e.get("subject", "无主题"), + "from": e.get("from", {}).get("emailAddress", {}).get("address", "未知"), + "date": e.get("receivedDateTime", ""), + "is_read": e.get("isRead", False), + "has_attachments": e.get("hasAttachments", False), + "body_preview": e.get("bodyPreview", ""), + } + ) - # 格式化 Graph API 返回的数据 - formatted = [] - for e in emails: - formatted.append( + return jsonify( { - "id": e.get("id"), - "subject": e.get("subject", "无主题"), - "from": e.get("from", {}).get("emailAddress", {}).get("address", "未知"), - "date": e.get("receivedDateTime", ""), - "is_read": e.get("isRead", False), - "has_attachments": e.get("hasAttachments", False), - "body_preview": e.get("bodyPreview", ""), + "success": True, + "emails": formatted, + "method": "Graph API", + "has_more": len(formatted) >= top, + "account_summary": account_summary, } ) + else: + graph_error = graph_result.get("error") + all_errors["graph"] = graph_error + + # 无 Mail.Read 权限 → 缓存为 imap,后续请求直接跳过 Graph + if graph_result.get("no_mail_permission"): + set_cached_channel(email_addr, "imap") + + # 如果是代理错误,不再回退 IMAP + if isinstance(graph_error, dict) and graph_error.get("type") in ( + "ProxyError", + "ConnectionError", + ): + return build_error_response( + "EMAIL_PROXY_CONNECTION_FAILED", + "代理连接失败,请检查分组代理设置", + message_en="Proxy connection failed. Please check the group proxy settings", + err_type="ProxyError", + status=502, + details=all_errors, + extra={"details": all_errors}, + ) - return jsonify( - { - "success": True, - "emails": formatted, - "method": "Graph API", - "has_more": len(formatted) >= top, - "account_summary": account_summary, - } - ) - else: - graph_error = graph_result.get("error") - all_errors["graph"] = graph_error - - # 如果是代理错误,不再回退 IMAP - if isinstance(graph_error, dict) and graph_error.get("type") in ( - "ProxyError", - "ConnectionError", - ): - return build_error_response( - "EMAIL_PROXY_CONNECTION_FAILED", - "代理连接失败,请检查分组代理设置", - message_en="Proxy connection failed. Please check the group proxy settings", - err_type="ProxyError", - status=502, - details=all_errors, - extra={"details": all_errors}, - ) - - imap_new_result = imap_service.get_emails_imap_with_server( - account["email"], - account["client_id"], - account["refresh_token"], - folder, - skip, - top, - IMAP_SERVER_NEW, - ) - if imap_new_result.get("success"): - account_summary = compact_summary_service.update_summary_from_message_list( - int(account["id"]), - imap_new_result.get("emails", []), - folder=folder, - ) - return jsonify( - { - "success": True, - "emails": imap_new_result.get("emails", []), - "method": "IMAP (New)", - "has_more": False, # IMAP 分页暂未完全实现 - "account_summary": account_summary, - } - ) - else: - all_errors["imap_new"] = imap_new_result.get("error") - - # 3. 尝试旧版 IMAP (outlook.office365.com) - imap_old_result = imap_service.get_emails_imap_with_server( + # 2. IMAP 回退:新旧服务器并发尝试 + imap_result = imap_service.get_emails_imap_concurrent( account["email"], account["client_id"], account["refresh_token"], folder, skip, top, - IMAP_SERVER_OLD, ) - if imap_old_result.get("success"): + if imap_result.get("success"): + set_cached_channel(email_addr, "imap") account_summary = compact_summary_service.update_summary_from_message_list( int(account["id"]), - imap_old_result.get("emails", []), + imap_result.get("emails", []), folder=folder, ) return jsonify( { "success": True, - "emails": imap_old_result.get("emails", []), - "method": "IMAP (Old)", + "emails": imap_result.get("emails", []), + "method": imap_result.get("method", "IMAP"), "has_more": False, "account_summary": account_summary, } ) else: - all_errors["imap_old"] = imap_old_result.get("error") + all_errors["imap"] = imap_result.get("error") # 所有方式均失败;若 Graph API 明确返回 token 过期,优先提示重新授权 - if graph_result.get("auth_expired"): + if graph_result and graph_result.get("auth_expired"): return build_error_response( "ACCOUNT_AUTH_EXPIRED", "账号授权已失效,请前往「刷新 Token」页面重新授权", @@ -281,6 +278,103 @@ def api_get_emails(email_addr: str) -> Any: ) +@login_required +def api_stream_emails(email_addr: str) -> Any: + """SSE 流式获取邮件:IMAP 每拉到一封就推一条 event,前端边收边渲染。""" + account = accounts_repo.get_account_by_email(email_addr) + if not account: + return build_error_response( + "ACCOUNT_NOT_FOUND", "账号不存在", + message_en="Account not found", err_type="NotFoundError", + status=404, details=f"email={email_addr}", + ) + + folder = request.args.get("folder", "inbox") + skip = int(request.args.get("skip", 0)) + top = int(request.args.get("top", 20)) + force = request.args.get("force", "").lower() in ("1", "true") + + account_type = (account.get("account_type") or "outlook").strip().lower() + + # 获取分组代理设置 + proxy_url = "" + if account.get("group_id"): + group = groups_repo.get_group_by_id(account["group_id"]) + if group: + proxy_url = group.get("proxy_url", "") or "" + + def _sse(event: str, data: dict) -> str: + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + def generate(): + # --- 服务端缓存(2h TTL)--- + if not force: + cached = get_cached_emails(email_addr, folder) + if cached: + for e in cached["emails"]: + yield _sse("email", e) + yield _sse("done", { + "method": cached["method"] + " (cached)", + "count": len(cached["emails"]), + "has_more": cached["has_more"], + }) + return + + collected_emails = [] + cached_channel = get_cached_channel(email_addr) + + # --- Graph API 尝试 --- + if account_type != "imap" and cached_channel != "imap": + graph_result = graph_service.get_emails_graph( + account["client_id"], account["refresh_token"], folder, skip, top, proxy_url, + ) + if graph_result.get("success"): + set_cached_channel(email_addr, "graph") + emails = graph_result.get("emails", []) + for e in emails: + fmt = { + "id": e.get("id"), + "subject": e.get("subject", "无主题"), + "from": e.get("from", {}).get("emailAddress", {}).get("address", "未知"), + "date": e.get("receivedDateTime", ""), + "is_read": e.get("isRead", False), + "has_attachments": e.get("hasAttachments", False), + "body_preview": e.get("bodyPreview", ""), + } + collected_emails.append(fmt) + yield _sse("email", fmt) + has_more = len(emails) >= top + set_cached_emails(email_addr, folder, collected_emails, "Graph API", has_more) + yield _sse("done", {"method": "Graph API", "count": len(emails), "has_more": has_more}) + return + elif graph_result.get("no_mail_permission"): + set_cached_channel(email_addr, "imap") + + # --- IMAP 流式 --- + yield _sse("status", {"message": "IMAP connecting..."}) + imap_method = "IMAP" + for item in imap_service.stream_emails_imap( + account["email"], account["client_id"], account["refresh_token"], + folder, skip, top, + ): + if item.get("type") == "email": + collected_emails.append(item["data"]) + yield _sse("email", item["data"]) + elif item.get("type") == "done": + imap_method = item.get("method", "IMAP") + set_cached_channel(email_addr, "imap") + set_cached_emails(email_addr, folder, collected_emails, imap_method, False) + yield _sse("done", {"method": imap_method, "count": item.get("count", 0), "has_more": False}) + elif item.get("type") == "error": + yield _sse("error", item.get("data", {})) + + return Response( + stream_with_context(generate()), + mimetype="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + + @login_required def api_delete_emails() -> Any: """批量删除邮件(永久删除)""" diff --git a/outlook_web/routes/emails.py b/outlook_web/routes/emails.py index d62eb38..123c3b0 100644 --- a/outlook_web/routes/emails.py +++ b/outlook_web/routes/emails.py @@ -13,6 +13,11 @@ def create_blueprint() -> Blueprint: view_func=emails_controller.api_get_emails, methods=["GET"], ) + bp.add_url_rule( + "/api/emails//stream", + view_func=emails_controller.api_stream_emails, + methods=["GET"], + ) bp.add_url_rule( "/api/emails//extract-verification", view_func=emails_controller.api_extract_verification, diff --git a/outlook_web/services/channel_cache.py b/outlook_web/services/channel_cache.py new file mode 100644 index 0000000..90a2625 --- /dev/null +++ b/outlook_web/services/channel_cache.py @@ -0,0 +1,43 @@ +""" +邮箱通道缓存:记住每个邮箱可用的读取方式(graph / imap),避免重复尝试必然失败的通道。 + +- graph: 有 Mail.Read 权限,直接走 Graph API +- imap: 无 Mail.Read 权限或 Graph API 持续失败,跳过 Graph 直走 IMAP +- TTL 1 小时后过期重新探测 +""" + +from __future__ import annotations + +import threading +import time +from typing import Optional + +_channel_cache: dict[str, dict] = {} +_cache_lock = threading.Lock() +_CACHE_TTL = 3600 # 1 hour + + +def get_cached_channel(email: str) -> Optional[str]: + """获取缓存的可用通道,过期返回 None。""" + with _cache_lock: + entry = _channel_cache.get(email) + if entry and entry["expires_at"] > time.time(): + return entry["method"] + if entry: + del _channel_cache[email] + return None + + +def set_cached_channel(email: str, method: str) -> None: + """缓存某邮箱的可用通道。""" + with _cache_lock: + _channel_cache[email] = { + "method": method, + "expires_at": time.time() + _CACHE_TTL, + } + + +def invalidate_channel(email: str) -> None: + """失效某邮箱的通道缓存。""" + with _cache_lock: + _channel_cache.pop(email, None) diff --git a/outlook_web/services/email_cache.py b/outlook_web/services/email_cache.py new file mode 100644 index 0000000..cc29091 --- /dev/null +++ b/outlook_web/services/email_cache.py @@ -0,0 +1,67 @@ +""" +邮件列表缓存:避免 2h 内重复拉取同一邮箱的同一文件夹。 + +- key: (email, folder) +- value: {emails, method, has_more, cached_at} +- TTL: 2 小时 +- 线程安全 +""" + +from __future__ import annotations + +import threading +import time +from typing import Any, Dict, List, Optional + +_email_cache: dict[str, dict] = {} +_cache_lock = threading.Lock() +_CACHE_TTL = 7200 # 2 hours + + +def _make_key(email: str, folder: str) -> str: + return f"{email}|{folder}" + + +def get_cached_emails(email: str, folder: str) -> Optional[Dict[str, Any]]: + """命中缓存返回 {emails, method, has_more},未命中或过期返回 None。""" + key = _make_key(email, folder) + with _cache_lock: + entry = _email_cache.get(key) + if entry and entry["expires_at"] > time.time(): + return { + "emails": entry["emails"], + "method": entry["method"], + "has_more": entry["has_more"], + } + if entry: + del _email_cache[key] + return None + + +def set_cached_emails( + email: str, + folder: str, + emails: List[Dict], + method: str, + has_more: bool = False, +) -> None: + """写入缓存。""" + key = _make_key(email, folder) + with _cache_lock: + _email_cache[key] = { + "emails": emails, + "method": method, + "has_more": has_more, + "expires_at": time.time() + _CACHE_TTL, + } + + +def invalidate_email_cache(email: str, folder: Optional[str] = None) -> None: + """失效缓存。folder=None 时清除该邮箱所有文件夹。""" + with _cache_lock: + if folder: + _email_cache.pop(_make_key(email, folder), None) + else: + keys_to_remove = [k for k in _email_cache if k.startswith(f"{email}|")] + for k in keys_to_remove: + del _email_cache[k] diff --git a/outlook_web/services/graph.py b/outlook_web/services/graph.py index 526018c..e7c9f87 100644 --- a/outlook_web/services/graph.py +++ b/outlook_web/services/graph.py @@ -88,10 +88,17 @@ def get_access_token_graph_result(client_id: str, refresh_token: str, proxy_url: # 根据 Microsoft Learn 文档:refresh token 可能会在每次使用时“自我替换”,应保存新的 refresh_token(如有)。 new_refresh_token = payload.get("refresh_token") + + # 检查返回的 scope 是否包含 Mail.Read 权限 + scope = payload.get("scope", "") + has_mail_read = "Mail.Read" in scope or "Mail.ReadWrite" in scope + return { "success": True, "access_token": access_token, "refresh_token": new_refresh_token, + "scope": scope, + "has_mail_read": has_mail_read, } except Exception as exc: return { @@ -127,6 +134,20 @@ def get_emails_graph( if not token_result.get("success"): return {"success": False, "error": token_result.get("error")} + # 无 Mail.Read 权限时直接跳过,不浪费一次 Graph API 请求 + if not token_result.get("has_mail_read"): + return { + "success": False, + "no_mail_permission": True, + "error": build_error_payload( + "GRAPH_NO_MAIL_PERMISSION", + "Token 无 Mail.Read 权限,跳过 Graph API", + "GraphPermissionError", + 403, + token_result.get("scope", ""), + ), + } + access_token = token_result.get("access_token") try: diff --git a/outlook_web/services/imap.py b/outlook_web/services/imap.py index 38bc89b..36ddb1c 100644 --- a/outlook_web/services/imap.py +++ b/outlook_web/services/imap.py @@ -261,28 +261,280 @@ def get_emails_imap_with_server( paged_ids = message_ids[start_idx:end_idx][::-1] + # 批量 FETCH:一次请求取所有消息(避免 N 次串行网络往返) + id_list = b",".join(paged_ids) emails_data = [] + status, msg_data = connection.fetch(id_list, "(RFC822)") + + if status == "OK" and msg_data: + for item in msg_data: + if not isinstance(item, tuple) or len(item) < 2: + continue + try: + raw_email = item[1] + msg = email.message_from_bytes(raw_email) + seq = item[0].split(b" ", 1)[0] if isinstance(item[0], bytes) else b"0" + msg_id_str = seq.decode() if isinstance(seq, bytes) else str(seq) + + body_preview = get_email_body(msg) + emails_data.append( + { + "id": msg_id_str, + "subject": decode_header_value(msg.get("Subject", "无主题")), + "from": decode_header_value(msg.get("From", "未知发件人")), + "date": msg.get("Date", "未知时间"), + "body_preview": (body_preview[:200] + "..." if len(body_preview) > 200 else body_preview), + } + ) + except Exception: + continue + + return {"success": True, "emails": emails_data} + except Exception as exc: + return { + "success": False, + "error": build_error_payload( + "EMAIL_FETCH_FAILED", + "获取邮件失败,请检查账号配置", + type(exc).__name__, + 500, + str(exc), + ), + } + finally: + if connection: + try: + connection.logout() + except Exception: + pass + + +IMAP_SERVER_OLD = "outlook.office365.com" + + +def stream_emails_imap( + account: str, + client_id: str, + refresh_token: str, + folder: str = "inbox", + skip: int = 0, + top: int = 20, +): + """Generator:逐条 yield 邮件,供 SSE 流式推送。并发尝试新旧 IMAP 服务器。""" + from concurrent.futures import ThreadPoolExecutor, as_completed + + token_result = get_access_token_imap_result(client_id, refresh_token) + if not token_result.get("success"): + yield {"type": "error", "data": token_result.get("error", {})} + return + + access_token = token_result.get("access_token") + + # 并发尝试两个 IMAP 服务器,用第一个成功连接的 + def _try_connect(server): + try: + conn = imaplib.IMAP4_SSL(server, IMAP_PORT) + auth_string = f"user={account}\1auth=Bearer {access_token}\1\1".encode("utf-8") + conn.authenticate("XOAUTH2", lambda x: auth_string) + + folder_map = { + "inbox": ['"INBOX"', "INBOX"], + "junkemail": ['"Junk"', '"Junk Email"', "Junk"], + "deleteditems": ['"Deleted"', '"Deleted Items"', '"Trash"', "Deleted"], + "trash": ['"Deleted"', '"Deleted Items"', '"Trash"', "Deleted"], + } + for imap_folder in folder_map.get((folder or "").lower(), ['"INBOX"']): + try: + status, _ = conn.select(imap_folder, readonly=True) + if status == "OK": + return conn, server + except Exception: + continue + conn.logout() + return None, server + except Exception: + return None, server + + connection = None + winner_server = None + with ThreadPoolExecutor(max_workers=2) as ex: + futures = [ex.submit(_try_connect, s) for s in [IMAP_SERVER_NEW, IMAP_SERVER_OLD]] + for f in as_completed(futures): + conn, srv = f.result() + if conn and not connection: + connection = conn + winner_server = srv + elif conn: + try: + conn.logout() + except Exception: + pass + + if not connection: + yield {"type": "error", "data": {"code": "IMAP_CONNECT_FAILED", "message": "所有 IMAP 服务器连接失败"}} + return + + try: + status, messages = connection.search(None, "ALL") + if status != "OK" or not messages or not messages[0]: + yield {"type": "done", "method": "IMAP", "count": 0} + return + + message_ids = messages[0].split() + total = len(message_ids) + start_idx = max(0, total - skip - top) + end_idx = total - skip + if start_idx >= end_idx: + yield {"type": "done", "method": "IMAP", "count": 0} + return + + paged_ids = message_ids[start_idx:end_idx][::-1] + method = "IMAP (New)" if winner_server == IMAP_SERVER_NEW else "IMAP (Old)" + count = 0 + for msg_id in paged_ids: try: status, msg_data = connection.fetch(msg_id, "(RFC822)") if status == "OK" and msg_data and msg_data[0]: raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) + body_preview = get_email_body(msg) + count += 1 + yield { + "type": "email", + "data": { + "id": (msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id)), + "subject": decode_header_value(msg.get("Subject", "无主题")), + "from": decode_header_value(msg.get("From", "未知发件人")), + "date": msg.get("Date", "未知时间"), + "body_preview": (body_preview[:200] + "..." if len(body_preview) > 200 else body_preview), + }, + } + except Exception: + continue + + yield {"type": "done", "method": method, "count": count} + finally: + try: + connection.logout() + except Exception: + pass + + +def _fetch_emails_with_token( + account: str, + access_token: str, + folder: str, + skip: int, + top: int, + server: str, +) -> Dict[str, Any]: + """使用已获取的 access_token 通过指定 IMAP 服务器读取邮件(内部方法)。""" + connection = None + try: + connection = imaplib.IMAP4_SSL(server, IMAP_PORT) + auth_string = f"user={account}\1auth=Bearer {access_token}\1\1".encode("utf-8") + connection.authenticate("XOAUTH2", lambda x: auth_string) + + folder_map = { + "inbox": ['"INBOX"', "INBOX"], + "junkemail": ['"Junk"', '"Junk Email"', "Junk", '"垃圾邮件"'], + "deleteditems": [ + '"Deleted"', + '"Deleted Items"', + '"Trash"', + "Deleted", + '"已删除邮件"', + ], + "trash": [ + '"Deleted"', + '"Deleted Items"', + '"Trash"', + "Deleted", + '"已删除邮件"', + ], + } + possible_folders = folder_map.get((folder or "").lower(), ['"INBOX"']) + + selected_folder = None + last_error = None + for imap_folder in possible_folders: + try: + status, response = connection.select(imap_folder, readonly=True) + if status == "OK": + selected_folder = imap_folder + break + last_error = f"select {imap_folder} status={status}" + except Exception as e: + last_error = f"select {imap_folder} error={str(e)}" + continue + + if not selected_folder: + return { + "success": False, + "error": build_error_payload( + "EMAIL_FETCH_FAILED", + "无法访问文件夹,请检查账号配置", + "IMAPSelectError", + 500, + {"last_error": last_error, "tried_folders": possible_folders}, + ), + } + + status, messages = connection.search(None, "ALL") + if status != "OK": + return { + "success": False, + "error": build_error_payload( + "EMAIL_FETCH_FAILED", + "获取邮件失败,请检查账号配置", + "IMAPSearchError", + 500, + f"search status={status}", + ), + } + if not messages or not messages[0]: + return {"success": True, "emails": [], "server": server} + + message_ids = messages[0].split() + total = len(message_ids) + start_idx = max(0, total - skip - top) + end_idx = total - skip + + if start_idx >= end_idx: + return {"success": True, "emails": [], "server": server} + + paged_ids = message_ids[start_idx:end_idx][::-1] + + # 批量 FETCH:一次请求取所有消息(避免 N 次串行网络往返) + id_list = b",".join(paged_ids) + emails_data = [] + status, msg_data = connection.fetch(id_list, "(RFC822)") + + if status == "OK" and msg_data: + for item in msg_data: + if not isinstance(item, tuple) or len(item) < 2: + continue + try: + raw_email = item[1] + msg = email.message_from_bytes(raw_email) + seq = item[0].split(b" ", 1)[0] if isinstance(item[0], bytes) else b"0" + msg_id_str = seq.decode() if isinstance(seq, bytes) else str(seq) body_preview = get_email_body(msg) emails_data.append( { - "id": (msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id)), + "id": msg_id_str, "subject": decode_header_value(msg.get("Subject", "无主题")), "from": decode_header_value(msg.get("From", "未知发件人")), "date": msg.get("Date", "未知时间"), "body_preview": (body_preview[:200] + "..." if len(body_preview) > 200 else body_preview), } ) - except Exception: - continue + except Exception: + continue - return {"success": True, "emails": emails_data} + return {"success": True, "emails": emails_data, "server": server} except Exception as exc: return { "success": False, @@ -302,6 +554,55 @@ def get_emails_imap_with_server( pass +def get_emails_imap_concurrent( + account: str, + client_id: str, + refresh_token: str, + folder: str = "inbox", + skip: int = 0, + top: int = 20, +) -> Dict[str, Any]: + """同时尝试新旧两个 IMAP 服务器,取第一个成功的结果。""" + from concurrent.futures import ThreadPoolExecutor, as_completed + + token_result = get_access_token_imap_result(client_id, refresh_token) + if not token_result.get("success"): + return {"success": False, "error": token_result.get("error")} + + access_token = token_result.get("access_token") + + with ThreadPoolExecutor(max_workers=2) as executor: + future_new = executor.submit( + _fetch_emails_with_token, account, access_token, folder, skip, top, IMAP_SERVER_NEW, + ) + future_old = executor.submit( + _fetch_emails_with_token, account, access_token, folder, skip, top, IMAP_SERVER_OLD, + ) + + all_errors = {} + for future in as_completed([future_new, future_old]): + result = future.result() + if result.get("success"): + server = result.pop("server", "IMAP") + method = "IMAP (New)" if server == IMAP_SERVER_NEW else "IMAP (Old)" + result["method"] = method + return result + else: + server = "new" if future is future_new else "old" + all_errors[f"imap_{server}"] = result.get("error") + + return { + "success": False, + "error": build_error_payload( + "EMAIL_FETCH_ALL_IMAP_FAILED", + "所有 IMAP 服务器均失败", + "IMAPError", + 502, + all_errors, + ), + } + + def get_email_detail_imap( account: str, client_id: str, diff --git a/static/js/features/emails.js b/static/js/features/emails.js index fb7ff43..8a55a23 100644 --- a/static/js/features/emails.js +++ b/static/js/features/emails.js @@ -45,75 +45,105 @@ container.innerHTML = `
${translateAppTextLocal('获取中…')}
`; - try { - // 每次只查询20封邮件 - const response = await fetch( - `/api/emails/${encodeURIComponent(email)}?method=${currentMethod}&folder=${currentFolder}&skip=0&top=20` - ); - const data = await response.json(); + // SSE 流式加载:每收到一封邮件立即渲染,不等全部完成 + const forceParam = forceRefresh ? '&force=1' : ''; + const streamUrl = `/api/emails/${encodeURIComponent(email)}/stream?folder=${currentFolder}&skip=0&top=20${forceParam}`; + const evtSource = new EventSource(streamUrl); + let streamEmails = []; + let firstEmail = true; + const clickHandler = isTempEmailGroup ? 'getTempEmailDetail' : 'selectEmail'; - if (data.success) { - currentEmails = data.emails; - currentMethod = data.method === 'Graph API' ? 'graph' : 'imap'; - hasMoreEmails = data.has_more; - if (typeof syncAccountSummaryToAccountCache === 'function' && data.account_summary) { - syncAccountSummaryToAccountCache(email, data.account_summary); - } + evtSource.addEventListener('email', function(e) { + const emailData = JSON.parse(e.data); + streamEmails.push(emailData); - if (typeof syncAccountSummaryToAccountCache === 'function' && data.account_summary) { - syncAccountSummaryToAccountCache(email, data.account_summary); - } + // 首封邮件到达时清除 loading + if (firstEmail) { + container.innerHTML = ''; + firstEmail = false; + } - // 保存到缓存 - emailListCache[cacheKey] = { - emails: currentEmails, - has_more: hasMoreEmails, - skip: currentSkip, - method: currentMethod - }; + // 逐条 DOM append + const index = streamEmails.length - 1; + const initial = (emailData.from || '?')[0].toUpperCase(); + const div = document.createElement('div'); + div.className = `email-item${emailData.is_read === false ? ' unread' : ''}`; + div.setAttribute('onclick', `${clickHandler}('${emailData.id}', ${index})`); + div.innerHTML = ` + + + + + `; + container.appendChild(div); - // 显示使用的方法和邮件数量 - const methodTag = document.getElementById('methodTag'); - methodTag.textContent = data.method; - methodTag.style.display = 'inline'; + // 实时更新计数 + document.getElementById('emailCount').textContent = `(${streamEmails.length})`; + }); - document.getElementById('emailCount').textContent = `(${data.emails.length})`; + evtSource.addEventListener('done', function(e) { + const info = JSON.parse(e.data); + evtSource.close(); - renderEmailList(data.emails); - } else { - // 显示详细的多方法失败弹框 - if (data.details) { - showEmailFetchErrorModal(data.details); - } else { - handleApiError(data, '获取邮件失败'); - } + currentEmails = streamEmails; + currentMethod = info.method.includes('Graph API') ? 'graph' : 'imap'; + hasMoreEmails = info.has_more || false; + + // 如果没收到任何邮件 + if (streamEmails.length === 0) { container.innerHTML = `
- ⚠️

${translateAppTextLocal('获取邮件失败,')}${translateAppTextLocal('点击查看详情')}

+ 📭 +

${translateAppTextLocal('收件箱为空')}

`; - lastFetchErrorDetails = data.details || {}; - // 绑定事件监听器 - const errorLink = document.getElementById('showEmailErrorLink'); - if (errorLink) { - errorLink.addEventListener('click', () => showEmailFetchErrorModal(lastFetchErrorDetails)); - } } - } catch (error) { - console.error('加载邮件列表失败:', error); - container.innerHTML = ` -
- ⚠️

${translateAppTextLocal('网络错误,请重试')}

-
- `; - } finally { + + // 更新 method tag + const methodTag = document.getElementById('methodTag'); + methodTag.textContent = info.method; + methodTag.style.display = 'inline'; + + // 保存到缓存 + emailListCache[cacheKey] = { + emails: currentEmails, + has_more: hasMoreEmails, + skip: currentSkip, + method: currentMethod + }; + // 启用按钮 if (refreshBtn) { refreshBtn.disabled = false; refreshBtn.textContent = translateAppTextLocal('获取邮件'); } folderTabs.forEach(tab => tab.disabled = false); - } + }); + + evtSource.addEventListener('error', function(e) { + // SSE 自身 error event(非服务端 error event) + if (evtSource.readyState === EventSource.CLOSED) return; + evtSource.close(); + + if (streamEmails.length === 0) { + container.innerHTML = ` +
+ ⚠️

${translateAppTextLocal('获取邮件失败')}

+
+ `; + } + if (refreshBtn) { + refreshBtn.disabled = false; + refreshBtn.textContent = translateAppTextLocal('获取邮件'); + } + folderTabs.forEach(tab => tab.disabled = false); + }); } // 渲染邮件列表