diff --git a/.env.example b/.env.example index 6d0ef5b..25e8f29 100644 --- a/.env.example +++ b/.env.example @@ -85,6 +85,8 @@ OPENAI_API_KEY=sk-your-api-key-here # === Optional: Tavily web search === # TAVILY_API_KEY= # Enables structured search in blog research and graph expansion +# TAVILY_MCP_URL= # Optional MCP proxy endpoint; used before TAVILY_API_KEY when set +# TAVILY_MCP_TOKEN= # Bearer token for TAVILY_MCP_URL # === Optional: Google Search Console === # GOOGLE_GSC_CREDENTIALS= # JSON string with client_id, client_secret, refresh_token diff --git a/frontend/src/components/chat/ChatContainer.tsx b/frontend/src/components/chat/ChatContainer.tsx index eaecd61..84a5639 100644 --- a/frontend/src/components/chat/ChatContainer.tsx +++ b/frontend/src/components/chat/ChatContainer.tsx @@ -65,6 +65,7 @@ export function ChatContainer({ hasMessages, projectId, projectContext, + compact = false, }: { messages: ChatMessage[]; isStreaming: boolean; @@ -73,6 +74,7 @@ export function ChatContainer({ hasMessages: boolean; projectId: number | null; projectContext?: ChatProjectContext | null; + compact?: boolean; }) { const { t } = useI18n(); const projectName = projectContext?.project.brand_name ?? null; @@ -95,13 +97,16 @@ export function ChatContainer({ sendMessage(prompt)} + compact={compact} /> -
- sendMessage(prompt)} - projectName={projectName} - /> -
+ {!compact && ( +
+ sendMessage(prompt)} + projectName={projectName} + /> +
+ )} ) : ( void; + compact?: boolean; }) { const { t } = useI18n(); const { project, scores, keywords, competitors, keyword_gaps, findings } = @@ -118,7 +120,7 @@ export function ProjectContextCard({ {/* Score cards */} -
+
{t("chat.contextQuickStart")}

-
+
{suggestions.map((s) => ( + ); + } + + return ( + + + {t(action.labelKey)} + + ); +} + +function QueueRow({ + row, + countFormatter, + onDiscuss, +}: { + row: AgentRow; + countFormatter: Intl.NumberFormat; + onDiscuss?: (prompt: string) => void; +}) { + const { t } = useI18n(); + const Icon = row.icon; + const style = STATUS_STYLES[row.tone]; + const count = row.count == null ? null : countFormatter.format(row.count); + const handleDiscuss = () => onDiscuss?.(t(row.discussPromptKey)); + + return ( +
+
+
+ +
+
+
+

{t(row.titleKey)}

+ + + {t(row.statusKey)} + +
+
+ {count != null && ( + + {count} {t(row.countLabelKey)} + + )} + {t(row.descriptionKey)} +
+
+
+ +
+ {row.actions.map((action, index) => ( + + ))} +
+
+ ); +} + +export function AgentActionQueue({ + projectId, + latest, + latestMonitoring, + latestReports, + pendingApprovals = 0, + blogDraftsCount = 0, + competitorCount = 0, + keywordCount = 0, + onDiscuss, +}: AgentActionQueueProps) { + const { t, locale } = useI18n(); + const { data: feedItems = [], isFetching } = useQuery({ + queryKey: ["agent-action-queue", projectId, locale], + queryFn: () => apiJson(`/projects/${projectId}/action-feed?lang=${locale}`), + retry: 1, + staleTime: 60_000, + }); + const countFormatter = useMemo(() => new Intl.NumberFormat(locale), [locale]); + const rows = useMemo( + () => + buildRows({ + projectId, + latest, + latestMonitoring, + latestReports, + pendingApprovals, + blogDraftsCount, + competitorCount, + keywordCount, + feedItems, + }), + [ + projectId, + latest, + latestMonitoring, + latestReports, + pendingApprovals, + blogDraftsCount, + competitorCount, + keywordCount, + feedItems, + ], + ); + + return ( +
+
+
+
+ +
+
+

{t(LABEL_KEYS.title)}

+

{t(LABEL_KEYS.subtitle)}

+
+
+ +
+ {isFetching ? ( + <> + + {t(LABEL_KEYS.loading)} + + ) : feedItems.length > 0 ? ( + <> + + {t(LABEL_KEYS.apiEnhanced)} + + ) : ( + <> + + {t(LABEL_KEYS.fallback)} + + )} +
+
+ +
+ {rows.map((row) => ( + + ))} +
+
+ ); +} diff --git a/frontend/src/components/project/ProjectChatPanel.tsx b/frontend/src/components/project/ProjectChatPanel.tsx new file mode 100644 index 0000000..0c79fb3 --- /dev/null +++ b/frontend/src/components/project/ProjectChatPanel.tsx @@ -0,0 +1,92 @@ +import { useEffect, useRef } from "react"; +import { ChatContainer } from "../chat/ChatContainer"; +import { useChat } from "../../hooks/useChat"; +import { useChatContext } from "../../hooks/useChatContext"; +import { useI18n } from "../../i18n"; + +interface ProjectChatPanelProps { + projectId: number; + projectName: string; + initialPrompt?: string; + onPromptConsumed?: () => void; +} + +export function ProjectChatPanel({ + projectId, + projectName, + initialPrompt, + onPromptConsumed, +}: ProjectChatPanelProps) { + const chat = useChat(projectId); + const { data: chatContext } = useChatContext(projectId); + const { t } = useI18n(); + const consumedPromptRef = useRef(null); + const { + messages, + isStreaming, + currentAgent, + projectId: activeProjectId, + sendMessage, + selectProject, + sessionReady, + } = chat; + + useEffect(() => { + if (!sessionReady || activeProjectId === projectId) return; + void selectProject(projectId); + }, [activeProjectId, projectId, selectProject, sessionReady]); + + useEffect(() => { + const prompt = initialPrompt?.trim() ?? ""; + if (!prompt) { + consumedPromptRef.current = null; + return; + } + if (!sessionReady || isStreaming || activeProjectId !== projectId) return; + if (consumedPromptRef.current === prompt) return; + + consumedPromptRef.current = prompt; + void sendMessage(prompt); + onPromptConsumed?.(); + }, [activeProjectId, initialPrompt, isStreaming, onPromptConsumed, projectId, sendMessage, sessionReady]); + + return ( +
+
+

+ {t("chat.title")} +

+
+

+ {t("chat.currentProject")} +

+

+ {projectName} +

+
+
+ +
+ {!sessionReady ? ( +
+ {t("chat.thinking")} +
+ ) : ( + 0} + projectId={activeProjectId} + projectContext={chatContext ?? null} + compact + /> + )} +
+
+ ); +} diff --git a/frontend/src/components/project/ProjectConsoleStatus.tsx b/frontend/src/components/project/ProjectConsoleStatus.tsx new file mode 100644 index 0000000..d1aeaae --- /dev/null +++ b/frontend/src/components/project/ProjectConsoleStatus.tsx @@ -0,0 +1,195 @@ +import { + AlertCircle, + CheckCircle2, + Clock3, + PauseCircle, + Radar, + TerminalSquare, +} from "lucide-react"; +import type { TranslationKey } from "../../i18n"; +import { useI18n } from "../../i18n"; +import type { LatestScans, MonitoringSummary, Project } from "../../types"; +import { utcDate } from "../../utils/time"; + +type ConsoleStatus = "completed" | "running" | "paused" | "needsReview"; +type CommandCenterKey = `commandCenter.${string}`; + +type ProjectConsoleStatusProps = { + project: Project; + latestMonitoring?: MonitoringSummary | null; + isPaused?: boolean; + pendingApprovals?: number; + latest: LatestScans; +}; + +const STATUS_STYLES: Record< + ConsoleStatus, + { + badge: string; + dot: string; + icon: typeof CheckCircle2; + } +> = { + completed: { + badge: "border-emerald-200 bg-emerald-50 text-emerald-700", + dot: "bg-emerald-500", + icon: CheckCircle2, + }, + running: { + badge: "border-blue-200 bg-blue-50 text-blue-700", + dot: "bg-blue-500", + icon: Radar, + }, + paused: { + badge: "border-amber-200 bg-amber-50 text-amber-700", + dot: "bg-amber-500", + icon: PauseCircle, + }, + needsReview: { + badge: "border-rose-200 bg-rose-50 text-rose-700", + dot: "bg-rose-500", + icon: AlertCircle, + }, +}; + +function asTranslationKey(key: CommandCenterKey): TranslationKey { + return key as TranslationKey; +} + +function getConsoleStatus({ + latestMonitoring, + isPaused, + pendingApprovals, +}: { + latestMonitoring?: MonitoringSummary | null; + isPaused?: boolean; + pendingApprovals: number; +}): ConsoleStatus { + if (isPaused) return "paused"; + if (latestMonitoring?.status === "pending" || latestMonitoring?.status === "running") return "running"; + if (pendingApprovals > 0) return "needsReview"; + return "completed"; +} + +function parseTimestamp(value?: string | null): number | null { + if (!value) return null; + const timestamp = utcDate(value).getTime(); + return Number.isFinite(timestamp) ? timestamp : null; +} + +function getLatestScanDate(latest: LatestScans, latestMonitoring?: MonitoringSummary | null): Date | null { + const timestamps = [ + parseTimestamp(latest.seo?.scanned_at), + parseTimestamp(latest.geo?.scanned_at), + parseTimestamp(latest.community?.scanned_at), + parseTimestamp(latestMonitoring?.completed_at), + parseTimestamp(latestMonitoring?.created_at), + ...latest.serp.map((snapshot) => parseTimestamp(snapshot.checked_at)), + ].filter((value): value is number => value != null); + + if (timestamps.length === 0) return null; + return new Date(Math.max(...timestamps)); +} + +function getTargetHost(url: string): string { + try { + return new URL(url).hostname.replace(/^www\./, ""); + } catch { + return url; + } +} + +function formatScanDate(date: Date, locale: string): string { + return new Intl.DateTimeFormat(locale, { + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + }).format(date); +} + +export function ProjectConsoleStatus({ + project, + latestMonitoring, + isPaused, + pendingApprovals = 0, + latest, +}: ProjectConsoleStatusProps) { + const { locale, t } = useI18n(); + const tc = (key: CommandCenterKey, params?: Record) => + t(asTranslationKey(key), params); + + const status = getConsoleStatus({ latestMonitoring, isPaused, pendingApprovals }); + const statusStyle = STATUS_STYLES[status]; + const StatusIcon = statusStyle.icon; + const latestScanDate = getLatestScanDate(latest, latestMonitoring); + const targetHost = getTargetHost(project.url); + const categoryLabel = + project.category === "auto" ? tc("commandCenter.categoryAuto") : project.category; + const scheduleKey = isPaused ? "commandCenter.schedule.paused" : "commandCenter.schedule.daily"; + const scanValue = latestScanDate + ? formatScanDate(latestScanDate, locale) + : tc("commandCenter.scan.none"); + + return ( +
+
+
+
+ +
+
+

+ {tc("commandCenter.statusBarTitle")} +

+
+ + {project.brand_name} + + + {targetHost} + +
+
+
+ +
+
+
{tc("commandCenter.status.label")}
+
+ + {status === "running" && ( + + )} + + + + + {tc(`commandCenter.status.${status}` as CommandCenterKey)} + +
+
+ +
+
+ + {tc("commandCenter.schedule.label")} +
+
+ {tc(scheduleKey as CommandCenterKey, { category: categoryLabel })} +
+
+ +
+
+ {tc("commandCenter.scan.label")} +
+
+ {scanValue} +
+
+
+
+
+ ); +} diff --git a/frontend/src/components/project/ScorePanel.tsx b/frontend/src/components/project/ScorePanel.tsx index b4403cf..8808352 100644 --- a/frontend/src/components/project/ScorePanel.tsx +++ b/frontend/src/components/project/ScorePanel.tsx @@ -79,11 +79,13 @@ export function ScorePanel({ previous, latestMonitoring, projectId, + compact = false, }: { latest: LatestScans; previous?: ProjectSummary["previous"]; latestMonitoring?: MonitoringSummary | null; projectId?: number; + compact?: boolean; }) { const { t } = useI18n(); const [selectedKind, setSelectedKind] = useState(null); @@ -105,7 +107,7 @@ export function ScorePanel({ return ( <> -
+
> = { "command.priorityFindingsBody": "{{count}} 个发现项可以转成修复、任务或内容更新。", "command.priorityReportTitle": "打开最新简报", "command.priorityReportBody": "报告已准备好,可用于同步团队当前变化和下一步交付。", + "commandCenter.statusBarTitle": "OpenCMO 运行状态", + "commandCenter.categoryAuto": "自动识别品类", + "commandCenter.status.label": "状态", + "commandCenter.status.completed": "已完成", + "commandCenter.status.running": "运行中", + "commandCenter.status.paused": "已暂停", + "commandCenter.status.needsReview": "待复核", + "commandCenter.schedule.label": "计划", + "commandCenter.schedule.daily": "每日监控 · {{category}}", + "commandCenter.schedule.paused": "已暂停 · {{category}}", + "commandCenter.scan.label": "最新信号", + "commandCenter.scan.none": "尚无扫描", + "commandCenter.company.title": "公司", + "commandCenter.company.category": "品类", + "commandCenter.company.competitors": "竞品", + "commandCenter.company.keywords": "关键词", + "commandCenter.company.review": "复核", + "commandCenter.company.reviewValue": "{{approvals}} 个待审,{{drafts}} 个草稿", + "commandCenter.company.workflowTitle": "工作流", + "commandCenter.company.workflowObserve": "观察:抓取、搜索、AI 回答和社区来源。", + "commandCenter.company.workflowDecide": "判断:智能体排序哪些变化重要,以及为什么重要。", + "commandCenter.company.workflowShip": "发布:外部内容先进入人工复核,再决定是否发布。", + "commandCenter.analytics.title": "分析", + "commandCenter.analytics.subtitle": "分数只是证据,不是终点。旁边的队列会把证据转成下一步动作。", + "commandCenter.scanHistory": "扫描历史", + "commandCenter.reviewGuardrail.title": "复核保护", + "commandCenter.reviewGuardrail.body": "外部帖子、回复和社交草稿会保留在「复核与发布」里,直到人工批准。OpenCMO 可以准备内容,但不应在未经复核的情况下发布到 Reddit、X、LinkedIn 或 Hacker News。", + "commandCenter.agentQueue.title": "智能体行动队列", + "commandCenter.agentQueue.subtitle": "渠道智能体把监控结果转成机会、草稿和待复核动作。", + "commandCenter.agentQueue.apiEnhanced": "实时信号", + "commandCenter.agentQueue.fallback": "回退视图", + "commandCenter.agentQueue.loading": "更新中", + "commandCenter.agentQueue.cta.viewDetails": "详情", + "commandCenter.agentQueue.cta.review": "复核", + "commandCenter.agentQueue.cta.generate": "生成", + "commandCenter.agentQueue.cta.discuss": "讨论", + "commandCenter.agentQueue.count.actions": "个动作", + "commandCenter.agentQueue.count.drafts": "个草稿", + "commandCenter.agentQueue.count.fixes": "个修复", + "commandCenter.agentQueue.count.opportunities": "个机会", + "commandCenter.agentQueue.count.signals": "个信号", + "commandCenter.agentQueue.count.threads": "个帖子", + "commandCenter.agentQueue.status.briefQueued": "简报排队中", + "commandCenter.agentQueue.status.canGenerate": "可生成", + "commandCenter.agentQueue.status.draftsReady": "草稿已就绪", + "commandCenter.agentQueue.status.fixesQueued": "修复已排队", + "commandCenter.agentQueue.status.monitoring": "监控中", + "commandCenter.agentQueue.status.needsInputs": "需补充输入", + "commandCenter.agentQueue.status.needsReview": "待复核", + "commandCenter.agentQueue.status.needsScan": "需扫描", + "commandCenter.agentQueue.status.opportunitiesReady": "机会已就绪", + "commandCenter.agentQueue.status.reportReady": "报告已就绪", + "commandCenter.agentQueue.status.visibilityReady": "可见度已就绪", + "commandCenter.agentQueue.agent.seo.title": "SEO 智能体", + "commandCenter.agentQueue.agent.seo.description.ready": "技术和 PageSpeed 问题已可检查。", + "commandCenter.agentQueue.agent.seo.description.pending": "运行或刷新扫描,建立 SEO 基线。", + "commandCenter.agentQueue.agent.geo.title": "AI 搜索智能体", + "commandCenter.agentQueue.agent.geo.description.ready": "AI 回答可见度已有最新读数。", + "commandCenter.agentQueue.agent.geo.description.pending": "尚未收集 AI 搜索快照。", + "commandCenter.agentQueue.agent.community.title": "社区智能体", + "commandCenter.agentQueue.agent.community.description.ready": "社区需求信号已可复核。", + "commandCenter.agentQueue.agent.community.description.pending": "正在等待更多公开讨论信号。", + "commandCenter.agentQueue.agent.reddit.title": "Reddit 智能体", + "commandCenter.agentQueue.agent.reddit.description.ready": "相关帖子或回复机会已就绪。", + "commandCenter.agentQueue.agent.reddit.description.pending": "让智能体把社区信号转成 Reddit 回复草稿。", + "commandCenter.agentQueue.agent.twitter.title": "X 智能体", + "commandCenter.agentQueue.agent.twitter.description.ready": "X/Twitter 草稿正在等待人工复核。", + "commandCenter.agentQueue.agent.twitter.description.pending": "从最新信号生成推文想法或短 thread。", + "commandCenter.agentQueue.agent.linkedin.title": "LinkedIn 智能体", + "commandCenter.agentQueue.agent.linkedin.description.ready": "LinkedIn 文案正在等待复核。", + "commandCenter.agentQueue.agent.linkedin.description.pending": "从最强发现生成一条专业帖。", + "commandCenter.agentQueue.agent.hackerNews.title": "Hacker News 智能体", + "commandCenter.agentQueue.agent.hackerNews.description.ready": "HN 讨论机会已可检查。", + "commandCenter.agentQueue.agent.hackerNews.description.pending": "只有当语境合适时,才生成谨慎的 HN 角度。", + "commandCenter.agentQueue.agent.content.title": "内容智能体", + "commandCenter.agentQueue.agent.content.description.ready": "内容草稿或内容动作已就绪。", + "commandCenter.agentQueue.agent.content.description.pending": "基线清楚后生成博客或发布草稿。", + "commandCenter.agentQueue.agent.report.title": "报告智能体", + "commandCenter.agentQueue.agent.report.description.ready": "监控摘要已可进入下一份简报。", + "commandCenter.agentQueue.agent.report.description.pending": "先运行监控,再生成报告包。", + "commandCenter.agentQueue.prompt.seo": "解释这个项目最重要的 SEO 或技术修复,并告诉我第一步应该做什么。", + "commandCenter.agentQueue.prompt.geo": "解释这个项目在 AI 搜索中的呈现方式,以及怎样提升回答可见度。", + "commandCenter.agentQueue.prompt.community": "复核当前社区信号,判断哪些机会值得行动。", + "commandCenter.agentQueue.prompt.reddit": "找出这个项目最适合的 Reddit 机会,并起草一条不营销腔、可人工复核的回复。", + "commandCenter.agentQueue.prompt.twitter": "根据最新项目信号,起草 3 条 X 帖子和 1 条短 thread。要求具体、可复核。", + "commandCenter.agentQueue.prompt.linkedin": "基于最强项目洞察起草一条 LinkedIn 帖子。要求有用、具体、不硬卖。", + "commandCenter.agentQueue.prompt.hackerNews": "判断这个项目是否有适合 Hacker News 的角度。如果有,起草一条谨慎的帖子或评论。", + "commandCenter.agentQueue.prompt.content": "把最新发现转成一个内容选题和一份可复核的大纲。", + "commandCenter.agentQueue.prompt.report": "把最新监控结果整理成一份简洁简报,并列出下一步动作。", // Scan History "scan.latestScans": "最近扫描", diff --git a/frontend/src/pages/ProjectPage.tsx b/frontend/src/pages/ProjectPage.tsx index 73d7706..fc657d5 100644 --- a/frontend/src/pages/ProjectPage.tsx +++ b/frontend/src/pages/ProjectPage.tsx @@ -1,8 +1,9 @@ +import { useState } from "react"; import { useParams } from "react-router"; +import { ExternalLink, FileText, Hash, PauseCircle, PlayCircle, Users } from "lucide-react"; import { useProjectSummary } from "../hooks/useProject"; import { LoadingSpinner } from "../components/common/LoadingSpinner"; import { ErrorAlert } from "../components/common/ErrorAlert"; -import { ProjectHeader } from "../components/project/ProjectHeader"; import { ProjectTabs } from "../components/project/ProjectTabs"; import { ScorePanel } from "../components/project/ScorePanel"; import { ScanHistoryTable } from "../components/project/ScanHistoryTable"; @@ -10,14 +11,27 @@ import { CampaignTimeline } from "../components/project/CampaignTimeline"; import { ActionFeed } from "../components/project/ActionFeed"; import { InsightBanner } from "../components/dashboard/InsightBanner"; import { useI18n } from "../i18n"; -import { ProjectCommandCenter } from "../components/project/ProjectCommandCenter"; +import { useSetProjectPause } from "../hooks/useProject"; import { BlogGenerateButton } from "../components/project/BlogGenerateButton"; +import { AgentActionQueue } from "../components/project/AgentActionQueue"; +import { ProjectChatPanel } from "../components/project/ProjectChatPanel"; +import { ProjectConsoleStatus } from "../components/project/ProjectConsoleStatus"; + +function normalizeHost(url: string) { + try { + return new URL(url).hostname.replace(/^www\./, ""); + } catch { + return url; + } +} export function ProjectPage() { const { id } = useParams(); const projectId = Number(id); const { data, isLoading, error } = useProjectSummary(projectId); const { t } = useI18n(); + const setPause = useSetProjectPause(); + const [chatPrompt, setChatPrompt] = useState(); if (isLoading) return ; if (error) return ; @@ -35,58 +49,187 @@ export function ProjectPage() { pending_approvals, blog_drafts_count, } = data; + const categoryLabel = project.category === "auto" ? t("project.categoryAuto") : project.category; + const host = normalizeHost(project.url); + const keywordCount = keyword_count ?? 0; + const competitorCount = competitor_count ?? 0; + const pendingApprovals = pending_approvals ?? 0; + const blogDraftsCount = blog_drafts_count ?? 0; + const handleTogglePause = () => { + setPause.mutate({ id: project.id, pause: !is_paused }); + }; return ( -
- +
+ -
- } - /> - -
- + + +
+
+
+

+ {t("commandCenter.analytics.title")} +

+

+ {t("commandCenter.analytics.subtitle")} +

+
+ +
+ + + +
+ +
+
+ +
+ + +
+ +
-
-
- + +
-
- -
+
+
+ + {t("commandCenter.scanHistory")} + +
+
+
-
-
- -
- -
- - Scan History ▾ - -
- -
-
-
-
+
+

+ {t("commandCenter.reviewGuardrail.title")} +

+

{t("commandCenter.reviewGuardrail.body")}

+
); diff --git a/src/opencmo/tools/community.py b/src/opencmo/tools/community.py index a232f57..0728a3d 100644 --- a/src/opencmo/tools/community.py +++ b/src/opencmo/tools/community.py @@ -9,7 +9,6 @@ from agents import function_tool -from opencmo import llm from opencmo.tools.browser_pool import browser_slot from opencmo.tools.community_providers import ( PROVIDER_REGISTRY, @@ -92,7 +91,9 @@ def _render_stub_queries( def _has_tavily() -> bool: - return bool(llm.get_key("TAVILY_API_KEY")) + from opencmo.tools.tavily_helper import tavily_available + + return tavily_available() def _format_site_query(query: str, domains: list[str]) -> str: @@ -167,15 +168,17 @@ async def _search_external_platform( # Try Tavily first if use_tavily: try: - from tavily import AsyncTavilyClient - api_key = llm.get_key("TAVILY_API_KEY") - client = AsyncTavilyClient(api_key=api_key) - resp = await client.search( - query=_format_site_query(spec.query, domains), + from opencmo.tools.tavily_helper import tavily_search + + tavily_results = await tavily_search( + _format_site_query(spec.query, domains), max_results=5, search_depth="basic", ) - results = resp.get("results", []) if isinstance(resp, dict) else [] + results = [ + {"url": item.url, "title": item.title, "content": item.snippet} + for item in (tavily_results or []) + ] except Exception as exc: errors.append(f"external {provider_name} {spec.source}: {exc}") diff --git a/src/opencmo/tools/community_providers.py b/src/opencmo/tools/community_providers.py index 8d618c2..b1fabeb 100644 --- a/src/opencmo/tools/community_providers.py +++ b/src/opencmo/tools/community_providers.py @@ -1061,8 +1061,9 @@ def _has_api_key() -> bool: @staticmethod def _has_tavily() -> bool: - from opencmo import llm - return bool(llm.get_key("TAVILY_API_KEY")) + from opencmo.tools.tavily_helper import tavily_available + + return tavily_available() @property def is_enabled(self) -> bool: @@ -1196,19 +1197,17 @@ async def _search_via_api(self, query: str, source: str) -> tuple[list[Discussio async def _search_via_tavily(self, query: str, source: str) -> tuple[list[DiscussionHit], list[str]]: try: - from tavily import AsyncTavilyClient - except ImportError: - return [], ["tavily-python not installed"] - try: - from opencmo import llm - api_key = llm.get_key("TAVILY_API_KEY", "") - client = AsyncTavilyClient(api_key=api_key) - resp = await client.search( - query=f"{query} site:youtube.com", + from opencmo.tools.tavily_helper import tavily_search + + tavily_results = await tavily_search( + f"{query} site:youtube.com", max_results=10, search_depth="basic", ) - results = resp.get("results", []) if isinstance(resp, dict) else [] + results = [ + {"url": item.url, "title": item.title, "content": item.snippet} + for item in (tavily_results or []) + ] return self.parse_tavily_results(results, source), [] except Exception as e: return [], [f"tavily youtube {source}: {e}"] @@ -1465,8 +1464,9 @@ def _has_bearer_token() -> bool: @staticmethod def _has_tavily() -> bool: - from opencmo import llm - return bool(llm.get_key("TAVILY_API_KEY")) + from opencmo.tools.tavily_helper import tavily_available + + return tavily_available() @property def is_enabled(self) -> bool: @@ -1573,20 +1573,17 @@ async def _search_via_tweepy(self, query: str, source: str) -> tuple[list[Discus async def _search_via_tavily(self, query: str, source: str) -> tuple[list[DiscussionHit], list[str]]: errors: list[str] = [] try: - from tavily import AsyncTavilyClient - except ImportError: - return [], ["tavily-python not installed"] + from opencmo.tools.tavily_helper import tavily_search - try: - from opencmo import llm - api_key = llm.get_key("TAVILY_API_KEY", "") - client = AsyncTavilyClient(api_key=api_key) - resp = await client.search( - query=f"{query} site:x.com OR site:twitter.com", + tavily_results = await tavily_search( + f"{query} site:x.com OR site:twitter.com", max_results=10, search_depth="basic", ) - results = resp.get("results", []) if isinstance(resp, dict) else [] + results = [ + {"url": item.url, "title": item.title, "content": item.snippet} + for item in (tavily_results or []) + ] return self.parse_tavily_results(results, source), errors except Exception as e: return [], [f"tavily {source}: {e}"] diff --git a/src/opencmo/tools/search.py b/src/opencmo/tools/search.py index 6985ea3..efc5a81 100644 --- a/src/opencmo/tools/search.py +++ b/src/opencmo/tools/search.py @@ -16,26 +16,17 @@ async def web_search(query: str) -> str: Args: query: The search query string. """ - from opencmo import llm - if llm.get_key("TAVILY_API_KEY"): - try: - from tavily import AsyncTavilyClient - - client = AsyncTavilyClient() - response = await client.search( - query=query, max_results=5, search_depth="basic", - ) - results = response.get("results", []) - if results: - parts = [] - for r in results: - title = r.get("title", "") - url = r.get("url", "") - content = r.get("content", "") - parts.append(f"### {title}\n{url}\n\n{content}") - return "\n\n---\n\n".join(parts) - except Exception as exc: - logger.debug("Tavily search failed, trying fallback: %s", exc) + try: + from opencmo.tools.tavily_helper import tavily_search + + results = await tavily_search(query, max_results=5, search_depth="basic") + if results: + parts = [] + for result in results: + parts.append(f"### {result.title}\n{result.url}\n\n{result.snippet}") + return "\n\n---\n\n".join(parts) + except Exception as exc: + logger.debug("Tavily search failed, trying fallback: %s", exc) # 2. Fallback: OpenAI built-in web search (native provider only) from opencmo.config import is_custom_provider diff --git a/src/opencmo/tools/serp_tracker.py b/src/opencmo/tools/serp_tracker.py index 90a3b26..4c37306 100644 --- a/src/opencmo/tools/serp_tracker.py +++ b/src/opencmo/tools/serp_tracker.py @@ -186,23 +186,29 @@ def _get_client(self, api_key: str) -> AsyncTavilyClient: @property def is_enabled(self) -> bool: + from opencmo.tools.tavily_helper import tavily_available - from opencmo import llm - return bool(llm.get_key("TAVILY_API_KEY")) + return tavily_available() async def check_ranking( self, keyword: str, target_domain: str, num_results: int = 20 ) -> SerpResult: try: - from opencmo import llm - api_key = llm.get_key("TAVILY_API_KEY", "") - client = self._get_client(api_key) - response = await client.search(query=keyword, max_results=num_results) - results = response.get("results", []) + from opencmo.tools.tavily_helper import tavily_search + + results = await tavily_search(keyword, max_results=num_results) + if results is None: + return SerpResult( + position=None, + url_found=None, + total_results=0, + provider=self.name, + error="Tavily unavailable", + ) target = target_domain.lower().removeprefix("www.") for i, item in enumerate(results): - url = item.get("url", "") + url = item.url domain = urlparse(url).netloc.lower().removeprefix("www.") if _domain_matches(domain, target): return SerpResult( diff --git a/src/opencmo/tools/tavily_helper.py b/src/opencmo/tools/tavily_helper.py index f46f993..9df8b32 100644 --- a/src/opencmo/tools/tavily_helper.py +++ b/src/opencmo/tools/tavily_helper.py @@ -2,8 +2,10 @@ from __future__ import annotations +import json import logging from dataclasses import dataclass +from typing import Any logger = logging.getLogger(__name__) @@ -17,9 +19,144 @@ class TavilyResult: def tavily_available() -> bool: - """Return True if the Tavily API key is configured.""" + """Return True if either Tavily MCP or the official Tavily API is configured.""" from opencmo import llm - return bool(llm.get_key("TAVILY_API_KEY")) + return bool(_tavily_mcp_config() or llm.get_key("TAVILY_API_KEY")) + + +def _tavily_mcp_config() -> tuple[str, str] | None: + from opencmo import llm + + url = (llm.get_key("TAVILY_MCP_URL") or "").strip() + token = (llm.get_key("TAVILY_MCP_TOKEN") or "").strip() + if not url or not token: + return None + return url, token + + +def _decode_mcp_response(text: str) -> dict[str, Any]: + """Decode JSON-RPC over streamable HTTP/SSE into a response object.""" + for line in text.splitlines(): + if not line.startswith("data: "): + continue + payload = line[6:].strip() + if not payload or payload == "[DONE]": + continue + return json.loads(payload) + return json.loads(text) + + +def _mcp_content_payload(result: dict[str, Any]) -> Any: + """Return the useful payload from an MCP tools/call result.""" + if result.get("isError"): + content = result.get("content") or [] + message = "" + if content and isinstance(content[0], dict): + message = str(content[0].get("text") or "") + raise RuntimeError(message or "Tavily MCP tool returned an error") + + structured = result.get("structuredContent") + if structured is not None: + return structured + + for item in result.get("content") or []: + if not isinstance(item, dict): + continue + text = item.get("text") + if not isinstance(text, str) or not text.strip(): + continue + try: + return json.loads(text) + except json.JSONDecodeError: + return text + return None + + +async def _mcp_tool_call(name: str, arguments: dict[str, Any]) -> Any: + config = _tavily_mcp_config() + if not config: + return None + + import httpx + + url, token = config + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": name, + "arguments": arguments, + }, + } + headers = { + "Authorization": f"Bearer {token}", + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + } + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(url, json=payload, headers=headers) + response.raise_for_status() + + decoded = _decode_mcp_response(response.text) + if decoded.get("error"): + raise RuntimeError(decoded["error"]) + return _mcp_content_payload(decoded.get("result") or {}) + + +def _parse_search_results(response: dict[str, Any] | None) -> list[TavilyResult]: + results: list[TavilyResult] = [] + if not isinstance(response, dict): + return results + for item in response.get("results", []): + if not isinstance(item, dict): + continue + results.append(TavilyResult( + title=str(item.get("title") or ""), + url=str(item.get("url") or ""), + snippet=str(item.get("content") or item.get("snippet") or ""), + )) + return results + + +async def _mcp_search( + query: str, + *, + max_results: int, + search_depth: str, + topic: str, +) -> list[TavilyResult] | None: + payload = await _mcp_tool_call("tavily_search", { + "query": query, + "max_results": max_results, + "search_depth": search_depth, + "topic": topic, + }) + results = _parse_search_results(payload) + return results if results else None + + +async def _mcp_extract( + url: str, + *, + extract_depth: str, + format: str, +) -> str | None: + payload = await _mcp_tool_call("tavily_extract", { + "urls": [url], + "extract_depth": extract_depth, + "format": format, + }) + if isinstance(payload, dict): + for item in payload.get("results", []): + if not isinstance(item, dict): + continue + content = _extract_result_content(item) + if content: + return content + if isinstance(payload, str) and payload.strip(): + return payload.strip() + return None async def tavily_search( @@ -31,16 +168,30 @@ async def tavily_search( ) -> list[TavilyResult] | None: """Perform a Tavily search and return structured results. - Returns None if TAVILY_API_KEY is not set or the search fails, + Returns None if Tavily is not configured or the search fails, allowing callers to fall back to their existing logic. """ - if not tavily_available(): + if _tavily_mcp_config(): + try: + result = await _mcp_search( + query, + max_results=max_results, + search_depth=search_depth, + topic=topic, + ) + if result: + return result + except Exception as exc: + logger.warning("Tavily MCP search failed for %r: %s", query, exc) + + from opencmo import llm + if not llm.get_key("TAVILY_API_KEY"): return None try: from tavily import AsyncTavilyClient - client = AsyncTavilyClient() + client = AsyncTavilyClient(api_key=llm.get_key("TAVILY_API_KEY", "")) response = await client.search( query=query, max_results=max_results, @@ -48,14 +199,7 @@ async def tavily_search( topic=topic, ) - results = [] - for item in response.get("results", []): - results.append(TavilyResult( - title=item.get("title", ""), - url=item.get("url", ""), - snippet=item.get("content", ""), - )) - return results + return _parse_search_results(response) except Exception as exc: logger.warning("Tavily search failed for %r: %s", query, exc) @@ -81,13 +225,21 @@ async def tavily_extract( Returns None when Tavily is unavailable, extraction fails, or the response contains no usable content so callers can fall back to crawl-based fetching. """ - if not tavily_available(): + if _tavily_mcp_config(): + try: + content = await _mcp_extract(url, extract_depth=extract_depth, format=format) + if content: + return content + except Exception as exc: + logger.warning("Tavily MCP extract failed for %r: %s", url, exc) + + from opencmo import llm + if not llm.get_key("TAVILY_API_KEY"): return None try: from tavily import AsyncTavilyClient - from opencmo import llm client = AsyncTavilyClient(api_key=llm.get_key("TAVILY_API_KEY")) response = await client.extract( urls=[url], diff --git a/tests/conftest.py b/tests/conftest.py index cba927f..b4b3256 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,8 @@ EXTERNAL_PROVIDER_KEYS = ( "TAVILY_API_KEY", + "TAVILY_MCP_URL", + "TAVILY_MCP_TOKEN", "YOUTUBE_API_KEY", "TWITTER_BEARER_TOKEN", "DATAFORSEO_LOGIN", diff --git a/tests/test_tavily_mcp.py b/tests/test_tavily_mcp.py new file mode 100644 index 0000000..bd3459f --- /dev/null +++ b/tests/test_tavily_mcp.py @@ -0,0 +1,109 @@ +"""Tests for Tavily MCP proxy support.""" + +import json + +import pytest + + +class _FakeResponse: + def __init__(self, payload: dict): + self.text = "event: message\ndata: " + json.dumps(payload) + "\n\n" + + def raise_for_status(self) -> None: + return None + + +class _FakeAsyncClient: + def __init__(self, calls: list[dict], payload: dict, **_: object): + self._calls = calls + self._payload = payload + + async def __aenter__(self): + return self + + async def __aexit__(self, *_: object) -> None: + return None + + async def post(self, url: str, *, json: dict, headers: dict): + self._calls.append({"url": url, "json": json, "headers": headers}) + return _FakeResponse(self._payload) + + +@pytest.mark.asyncio +async def test_tavily_search_uses_mcp_proxy(monkeypatch): + from opencmo.tools.tavily_helper import tavily_search + + calls: list[dict] = [] + tool_payload = { + "results": [ + { + "title": "OpenCMO", + "url": "https://www.aidcmo.com/", + "content": "AI CMO command center", + } + ] + } + rpc_payload = { + "jsonrpc": "2.0", + "id": 1, + "result": {"content": [{"type": "text", "text": json.dumps(tool_payload)}]}, + } + monkeypatch.setenv("TAVILY_MCP_URL", "https://tavily.example.test/mcp") + monkeypatch.setenv("TAVILY_MCP_TOKEN", "secret-token") + monkeypatch.delenv("TAVILY_API_KEY", raising=False) + + import httpx + + monkeypatch.setattr( + httpx, + "AsyncClient", + lambda **kwargs: _FakeAsyncClient(calls, rpc_payload, **kwargs), + ) + + results = await tavily_search("OpenCMO", max_results=1) + + assert results is not None + assert results[0].title == "OpenCMO" + assert results[0].url == "https://www.aidcmo.com/" + assert calls[0]["url"] == "https://tavily.example.test/mcp" + assert calls[0]["headers"]["Authorization"] == "Bearer secret-token" + assert calls[0]["json"]["params"]["name"] == "tavily_search" + assert calls[0]["json"]["params"]["arguments"]["query"] == "OpenCMO" + + +@pytest.mark.asyncio +async def test_tavily_extract_uses_mcp_proxy(monkeypatch): + from opencmo.tools.tavily_helper import tavily_extract + + calls: list[dict] = [] + tool_payload = { + "results": [ + { + "url": "https://www.aidcmo.com/", + "raw_content": "# OpenCMO\n\nAI growth tools.", + } + ] + } + rpc_payload = { + "jsonrpc": "2.0", + "id": 1, + "result": {"content": [{"type": "text", "text": json.dumps(tool_payload)}]}, + } + monkeypatch.setenv("TAVILY_MCP_URL", "https://tavily.example.test/mcp") + monkeypatch.setenv("TAVILY_MCP_TOKEN", "secret-token") + monkeypatch.delenv("TAVILY_API_KEY", raising=False) + + import httpx + + monkeypatch.setattr( + httpx, + "AsyncClient", + lambda **kwargs: _FakeAsyncClient(calls, rpc_payload, **kwargs), + ) + + content = await tavily_extract("https://www.aidcmo.com/", extract_depth="advanced") + + assert content == "# OpenCMO\n\nAI growth tools." + assert calls[0]["json"]["params"]["name"] == "tavily_extract" + assert calls[0]["json"]["params"]["arguments"]["urls"] == ["https://www.aidcmo.com/"] + assert calls[0]["json"]["params"]["arguments"]["extract_depth"] == "advanced"