Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ EXPOSE 5000
# 健康检查
HEALTHCHECK --interval=30s --timeout=5s --start-period=20s CMD ["python","-c","import urllib.request as u; u.urlopen('http://localhost:5000/healthz', timeout=4).read()"]

# 启动应用(使用 Gunicorn,单 worker 避免 session 共享问题
# 注意:禁用 --preload,避免在 master 进程中启动后台调度线程
CMD ["gunicorn", "-w", "1", "-b", "0.0.0.0:5000", "--timeout", "120", "--access-logfile", "-", "web_outlook_app:app"]
# 启动应用(单 worker + 多线程:保持调度器单实例,同时支持并发请求处理
# 瓶颈是网络 I/O(Graph API / IMAP),GIL 在 I/O 期间释放,线程并发有效
CMD ["gunicorn", "-w", "1", "--threads", "8", "-b", "0.0.0.0:5000", "--timeout", "120", "--access-logfile", "-", "web_outlook_app:app"]
310 changes: 202 additions & 108 deletions outlook_web/controllers/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging
from typing import Any

from flask import jsonify, request
import json

from flask import Response, jsonify, request, stream_with_context

from outlook_web import config
from outlook_web.audit import log_audit
Expand All @@ -18,6 +20,8 @@
from outlook_web.services import external_api as external_api_service
from outlook_web.services import graph as graph_service
from outlook_web.services import imap as imap_service
from outlook_web.services.channel_cache import get_cached_channel, set_cached_channel
from outlook_web.services.email_cache import get_cached_emails, set_cached_emails
from outlook_web.services.imap_generic import (
get_email_detail_imap_generic_result,
get_emails_imap_generic,
Expand Down Expand Up @@ -116,6 +120,16 @@ def api_get_emails(email_addr: str) -> Any:
)
return jsonify(result)

# 服务端缓存(2h TTL)
cached = get_cached_emails(email_addr, folder)
if cached:
return jsonify({
"success": True,
"emails": cached["emails"],
"method": cached["method"] + " (cached)",
"has_more": cached["has_more"],
})

# 获取分组代理设置
proxy_url = ""
if account.get("group_id"):
Expand All @@ -125,143 +139,126 @@ def api_get_emails(email_addr: str) -> Any:

# 收集所有错误信息
all_errors = {}
graph_result = None

# 通道缓存:已知走 IMAP 的账号直接跳过 Graph API
cached_channel = get_cached_channel(email_addr)

# 1. 尝试 Graph API(缓存为 imap 时跳过)
if cached_channel != "imap":
graph_result = graph_service.get_emails_graph(account["client_id"], account["refresh_token"], folder, skip, top, proxy_url)
if graph_result.get("success"):
set_cached_channel(email_addr, "graph")
emails = graph_result.get("emails", [])
account_summary = compact_summary_service.update_summary_from_message_list(
int(account["id"]),
emails,
folder=folder,
)
# 更新刷新时间,同时保存 Microsoft 可能返回的新 refresh_token(Token Rotation)
db = get_db()
new_rt = graph_result.get("new_refresh_token")
if new_rt and new_rt != account.get("refresh_token"):
from outlook_web.security.crypto import encrypt_data as _encrypt_data

# 1. 尝试 Graph API
graph_result = graph_service.get_emails_graph(account["client_id"], account["refresh_token"], folder, skip, top, proxy_url)
if graph_result.get("success"):
emails = graph_result.get("emails", [])
account_summary = compact_summary_service.update_summary_from_message_list(
int(account["id"]),
emails,
folder=folder,
)
# 更新刷新时间,同时保存 Microsoft 可能返回的新 refresh_token(Token Rotation)
db = get_db()
new_rt = graph_result.get("new_refresh_token")
if new_rt and new_rt != account.get("refresh_token"):
from outlook_web.security.crypto import encrypt_data as _encrypt_data

try:
db.execute(
"UPDATE accounts SET refresh_token = ?, last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE email = ?",
(_encrypt_data(new_rt), email_addr),
)
except Exception:
try:
db.execute(
"UPDATE accounts SET refresh_token = ?, last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE email = ?",
(_encrypt_data(new_rt), email_addr),
)
except Exception:
db.execute(
"UPDATE accounts SET last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE email = ?",
(email_addr,),
)
else:
db.execute(
"UPDATE accounts SET last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE email = ?",
"""
UPDATE accounts
SET last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE email = ?
""",
(email_addr,),
)
else:
db.execute(
"""
UPDATE accounts
SET last_refresh_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE email = ?
""",
(email_addr,),
)
db.commit()
db.commit()

# 格式化 Graph API 返回的数据
formatted = []
for e in emails:
formatted.append(
{
"id": e.get("id"),
"subject": e.get("subject", "无主题"),
"from": e.get("from", {}).get("emailAddress", {}).get("address", "未知"),
"date": e.get("receivedDateTime", ""),
"is_read": e.get("isRead", False),
"has_attachments": e.get("hasAttachments", False),
"body_preview": e.get("bodyPreview", ""),
}
)

# 格式化 Graph API 返回的数据
formatted = []
for e in emails:
formatted.append(
return jsonify(
{
"id": e.get("id"),
"subject": e.get("subject", "无主题"),
"from": e.get("from", {}).get("emailAddress", {}).get("address", "未知"),
"date": e.get("receivedDateTime", ""),
"is_read": e.get("isRead", False),
"has_attachments": e.get("hasAttachments", False),
"body_preview": e.get("bodyPreview", ""),
"success": True,
"emails": formatted,
"method": "Graph API",
"has_more": len(formatted) >= top,
"account_summary": account_summary,
}
)
else:
graph_error = graph_result.get("error")
all_errors["graph"] = graph_error

# 无 Mail.Read 权限 → 缓存为 imap,后续请求直接跳过 Graph
if graph_result.get("no_mail_permission"):
set_cached_channel(email_addr, "imap")

# 如果是代理错误,不再回退 IMAP
if isinstance(graph_error, dict) and graph_error.get("type") in (
"ProxyError",
"ConnectionError",
):
return build_error_response(
"EMAIL_PROXY_CONNECTION_FAILED",
"代理连接失败,请检查分组代理设置",
message_en="Proxy connection failed. Please check the group proxy settings",
err_type="ProxyError",
status=502,
details=all_errors,
extra={"details": all_errors},
)

return jsonify(
{
"success": True,
"emails": formatted,
"method": "Graph API",
"has_more": len(formatted) >= top,
"account_summary": account_summary,
}
)
else:
graph_error = graph_result.get("error")
all_errors["graph"] = graph_error

# 如果是代理错误,不再回退 IMAP
if isinstance(graph_error, dict) and graph_error.get("type") in (
"ProxyError",
"ConnectionError",
):
return build_error_response(
"EMAIL_PROXY_CONNECTION_FAILED",
"代理连接失败,请检查分组代理设置",
message_en="Proxy connection failed. Please check the group proxy settings",
err_type="ProxyError",
status=502,
details=all_errors,
extra={"details": all_errors},
)

imap_new_result = imap_service.get_emails_imap_with_server(
account["email"],
account["client_id"],
account["refresh_token"],
folder,
skip,
top,
IMAP_SERVER_NEW,
)
if imap_new_result.get("success"):
account_summary = compact_summary_service.update_summary_from_message_list(
int(account["id"]),
imap_new_result.get("emails", []),
folder=folder,
)
return jsonify(
{
"success": True,
"emails": imap_new_result.get("emails", []),
"method": "IMAP (New)",
"has_more": False, # IMAP 分页暂未完全实现
"account_summary": account_summary,
}
)
else:
all_errors["imap_new"] = imap_new_result.get("error")

# 3. 尝试旧版 IMAP (outlook.office365.com)
imap_old_result = imap_service.get_emails_imap_with_server(
# 2. IMAP 回退:新旧服务器并发尝试
imap_result = imap_service.get_emails_imap_concurrent(
account["email"],
account["client_id"],
account["refresh_token"],
folder,
skip,
top,
IMAP_SERVER_OLD,
)
if imap_old_result.get("success"):
if imap_result.get("success"):
set_cached_channel(email_addr, "imap")
account_summary = compact_summary_service.update_summary_from_message_list(
int(account["id"]),
imap_old_result.get("emails", []),
imap_result.get("emails", []),
folder=folder,
)
return jsonify(
{
"success": True,
"emails": imap_old_result.get("emails", []),
"method": "IMAP (Old)",
"emails": imap_result.get("emails", []),
"method": imap_result.get("method", "IMAP"),
"has_more": False,
"account_summary": account_summary,
}
)
else:
all_errors["imap_old"] = imap_old_result.get("error")
all_errors["imap"] = imap_result.get("error")

# 所有方式均失败;若 Graph API 明确返回 token 过期,优先提示重新授权
if graph_result.get("auth_expired"):
if graph_result and graph_result.get("auth_expired"):
return build_error_response(
"ACCOUNT_AUTH_EXPIRED",
"账号授权已失效,请前往「刷新 Token」页面重新授权",
Expand All @@ -281,6 +278,103 @@ def api_get_emails(email_addr: str) -> Any:
)


@login_required
def api_stream_emails(email_addr: str) -> Any:
"""SSE 流式获取邮件:IMAP 每拉到一封就推一条 event,前端边收边渲染。"""
account = accounts_repo.get_account_by_email(email_addr)
if not account:
return build_error_response(
"ACCOUNT_NOT_FOUND", "账号不存在",
message_en="Account not found", err_type="NotFoundError",
status=404, details=f"email={email_addr}",
)

folder = request.args.get("folder", "inbox")
skip = int(request.args.get("skip", 0))
top = int(request.args.get("top", 20))
force = request.args.get("force", "").lower() in ("1", "true")

account_type = (account.get("account_type") or "outlook").strip().lower()

# 获取分组代理设置
proxy_url = ""
if account.get("group_id"):
group = groups_repo.get_group_by_id(account["group_id"])
if group:
proxy_url = group.get("proxy_url", "") or ""

def _sse(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"

def generate():
# --- 服务端缓存(2h TTL)---
if not force:
cached = get_cached_emails(email_addr, folder)
if cached:
for e in cached["emails"]:
yield _sse("email", e)
yield _sse("done", {
"method": cached["method"] + " (cached)",
"count": len(cached["emails"]),
"has_more": cached["has_more"],
})
return

collected_emails = []
cached_channel = get_cached_channel(email_addr)

# --- Graph API 尝试 ---
if account_type != "imap" and cached_channel != "imap":
graph_result = graph_service.get_emails_graph(
account["client_id"], account["refresh_token"], folder, skip, top, proxy_url,
)
if graph_result.get("success"):
set_cached_channel(email_addr, "graph")
emails = graph_result.get("emails", [])
for e in emails:
fmt = {
"id": e.get("id"),
"subject": e.get("subject", "无主题"),
"from": e.get("from", {}).get("emailAddress", {}).get("address", "未知"),
"date": e.get("receivedDateTime", ""),
"is_read": e.get("isRead", False),
"has_attachments": e.get("hasAttachments", False),
"body_preview": e.get("bodyPreview", ""),
}
collected_emails.append(fmt)
yield _sse("email", fmt)
has_more = len(emails) >= top
set_cached_emails(email_addr, folder, collected_emails, "Graph API", has_more)
yield _sse("done", {"method": "Graph API", "count": len(emails), "has_more": has_more})
return
elif graph_result.get("no_mail_permission"):
set_cached_channel(email_addr, "imap")

# --- IMAP 流式 ---
yield _sse("status", {"message": "IMAP connecting..."})
imap_method = "IMAP"
for item in imap_service.stream_emails_imap(
account["email"], account["client_id"], account["refresh_token"],
folder, skip, top,
):
if item.get("type") == "email":
collected_emails.append(item["data"])
yield _sse("email", item["data"])
elif item.get("type") == "done":
imap_method = item.get("method", "IMAP")
set_cached_channel(email_addr, "imap")
set_cached_emails(email_addr, folder, collected_emails, imap_method, False)
yield _sse("done", {"method": imap_method, "count": item.get("count", 0), "has_more": False})
elif item.get("type") == "error":
yield _sse("error", item.get("data", {}))

return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)


@login_required
def api_delete_emails() -> Any:
"""批量删除邮件(永久删除)"""
Expand Down
5 changes: 5 additions & 0 deletions outlook_web/routes/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ def create_blueprint() -> Blueprint:
view_func=emails_controller.api_get_emails,
methods=["GET"],
)
bp.add_url_rule(
"/api/emails/<email_addr>/stream",
view_func=emails_controller.api_stream_emails,
methods=["GET"],
)
bp.add_url_rule(
"/api/emails/<email_addr>/extract-verification",
view_func=emails_controller.api_extract_verification,
Expand Down
Loading