diff --git a/community/portfolio-monitor/README.md b/community/portfolio-monitor/README.md new file mode 100644 index 00000000..037e75dc --- /dev/null +++ b/community/portfolio-monitor/README.md @@ -0,0 +1,108 @@ +# Portfolio Monitor + +A passive background ability that tracks your stock portfolio in real time, fires proactive alerts when positions move beyond your thresholds, and delivers a full P&L breakdown on demand — all by voice. + +Just add your stocks once and it handles the rest: morning open summary, live price monitoring every 5 minutes during market hours, and an end-of-day wrap-up when the bell rings. + +## Setup + +1. Get a free API key at [finnhub.io](https://finnhub.io) (60 calls/minute, no daily cap) +2. In OpenHome, go to **Settings → API Keys** and add your key as `finnhub_api_key` +3. Optionally get an [Alpha Vantage](https://www.alphavantage.co) key (25 calls/day free) and add it as `alphavantage_api_key` + +## Trigger Phrases + +- `portfolio monitor` / `my portfolio` / `portfolio update` +- `check my stocks` / `stock update` / `stock check` +- `check Apple` / `how's Tesla` / `how's NVDA doing` +- `compare my stocks` / `day over day` / `versus yesterday` / `how did my stocks do` +- `add a stock` / `add to portfolio` / `log a stock` +- `update my position` / `bought more` / `I sold` / `add more shares` / `sold some` +- `remove from portfolio` / `remove a stock` +- `set a stock alert` / `price alert` +- `biggest movers` / `what's moving` / `gainers today` / `losers today` +- `how are the markets` / `market update` / `market pulse` / `market today` +- `clear my portfolio` / `wipe my portfolio` + +## Features + +**Passive Monitoring** +- Polls every 5 minutes during market hours (9:30am–4:00pm ET, Mon–Fri) +- Sleeps 30 minutes outside market hours — no wasted API calls +- Price cache with 3-minute TTL prevents redundant fetches + +**Proactive Alerts** +- Morning open: brief summary of what you're tracking when market opens +- Price alerts: fires immediately when a stock drops or rises beyond your threshold (day change %) +- End-of-day wrap-up: portfolio value, day P&L, top gainer and loser +- Each alert fires at most once per day per direction per stock + +**Interactive Queries** +- PORTFOLIO: opens with a quick snapshot (total value, today's P&L, overall P&L), then offers navigation — say 'breakdown', 'compare', 'movers', 'market', or a stock name +- BREAKDOWN: full per-stock detail — price, day change %, position value, and overall P&L (chunked at 4 stocks, asks if you want more) +- COMPARE: day-over-day view — each stock's price move and dollar change vs yesterday's close +- CHECK: current price, day change, and position P&L for a specific stock; follow-up loop to check multiple stocks back-to-back +- MOVERS: biggest gainer and loser in your portfolio today +- MARKET: live pulse on S&P 500, Nasdaq, and Dow Jones +- ADD: add a stock by name or ticker — specify shares and avg cost in one shot or via follow-up prompts; loop to add multiple stocks; offers UPDATE if stock already exists +- UPDATE: modify an existing position — bought more shares (recalculates weighted avg cost), sold some (reduces share count or removes if fully sold), or correct/overwrite values +- SET_ALERT: set drop/rise percentage thresholds per stock; loop to set alerts for multiple stocks +- REMOVE: remove a stock from your portfolio (with confirmation) +- CLEAR: wipe the entire portfolio (with confirmation) + +**Smart Details** +- LLM intent router handles natural, complex requests reliably ("what happened to my Google position", "how's the portfolio looking") +- Resolves company names to tickers (say "Apple", not "AAPL"); LLM fallback for any company not in the built-in map +- Finnhub primary API with Alpha Vantage fallback +- On-demand price fetch if cache is empty; TTL-aware refresh so data is always current +- Market-hours-aware ET timezone detection (DST handled, no external library) + +## Example Conversation + +> **You:** my portfolio +> +> **Assistant:** Portfolio at $47,200 — down $380 today, up $3,100 (7%) overall. Say 'breakdown' for full detail, 'compare' for day-over-day, 'movers' for biggest movers, 'market' for market indices, or a stock name. +> +> **You:** market +> +> **Assistant:** Markets today: S&P 500 down 0.8%, Nasdaq down 1.2%, Dow Jones down 0.5%. +> +> **You:** how's Apple doing +> +> **Assistant:** Apple is at $189, down 1% today. Your 20 shares are worth $3,780 — up $620 (20%) on your position. +> +> **You:** breakdown +> +> **Assistant:** Apple: $189, down 1% today, up $620 (20%) overall. Tesla: $242, down 2% today, down $180 (4%) overall. Nvidia: $875, up 3% today, up $4,100 (68%) overall. Microsoft: $415, flat today, up $900 (12%) overall. Want to hear the rest? +> +> **You:** yes +> +> **Assistant:** Amazon: $198, down 0% today, up $260 (7%) overall. + +--- + +> **You:** I bought more Tesla +> +> **Assistant:** Tesla: 10 shares at $220 average, total cost $2,200. Say 'bought more', 'sold some', or 'correct' to overwrite. +> +> **You:** bought more +> +> **Assistant:** How many shares did you buy, and at what price? Say both — like '5 at 210'. +> +> **You:** 5 at 245 +> +> **Assistant:** Updated — Tesla now 15 shares at $228 average. + +--- + +> **[Background alert, proactive]** +> +> **Assistant:** Heads up — Nvidia is down 5% today. Your position is up $3,200. Say 'portfolio monitor' to review. + +## Notes + +- Price alerts are based on the day's change percentage (vs previous close), not vs your avg cost +- The background daemon only runs while OpenHome is active — not a 24/7 service +- Supports 50+ major US companies by name out of the box; any ticker symbol works directly +- Trigger phrases for UPDATE: `update my position`, `bought more`, `I sold`, `add more shares`, `sold some` +- Trigger phrases for MARKET: `how are the markets`, `market update`, `market pulse`, `market today` diff --git a/community/portfolio-monitor/__init__.py b/community/portfolio-monitor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/portfolio-monitor/background.py b/community/portfolio-monitor/background.py new file mode 100644 index 00000000..4ed1d69c --- /dev/null +++ b/community/portfolio-monitor/background.py @@ -0,0 +1,448 @@ +import requests +from datetime import datetime, timedelta + +from src.agent.capability import MatchingCapability +from src.agent.capability_worker import CapabilityWorker +from src.main import AgentWorker + +STORAGE_KEY = "portfolio_data" +POLL_MARKET_OPEN = 300.0 +POLL_MARKET_CLOSED = 1800.0 +POLL_NO_HOLDINGS = 30.0 +CACHE_TTL_SECONDS = 180 +MAX_API_CALLS_PER_POLL = 50 + +FINNHUB_BASE = "https://finnhub.io/api/v1" +AV_BASE = "https://www.alphavantage.co/query" + + +def _empty_data() -> dict: + return { + "holdings": [], + "alert_thresholds": {}, + "price_cache": {}, + "alerted_today": [], + "meta": { + "api_calls_today": 0, + "api_calls_date": "", + "last_eod_summary": "", + }, + } + + +def _new_state() -> dict: + return { + "current_day": "", + "open_notified": False, + "eod_spoken": False, + } + + +class PortfolioMonitorBackground(MatchingCapability): + worker: AgentWorker = None + capability_worker: CapabilityWorker = None + background_daemon_mode: bool = False + finnhub_key: str = "" + av_key: str = "" + + # Do not change following tag of register capability + # {{register capability}} + + # ------------------------------------------------------------------ + # Context Storage + # ------------------------------------------------------------------ + + def _load_data(self) -> dict: + try: + result = self.capability_worker.get_single_key(STORAGE_KEY) + if result and result.get("value"): + return result["value"] + return _empty_data() + except Exception as e: + self.worker.editor_logging_handler.error(f"[PortfolioMonitor] Load error: {e}") + return _empty_data() + + def _save_data(self, data: dict): + try: + self.capability_worker.update_key(STORAGE_KEY, data) + except Exception: + try: + self.capability_worker.create_key(STORAGE_KEY, data) + except Exception as e: + self.worker.editor_logging_handler.error(f"[PortfolioMonitor] Save error: {e}") + + # ------------------------------------------------------------------ + # Market hours (ET, no pytz) + # ------------------------------------------------------------------ + + def _et_now(self) -> datetime: + utc = datetime.utcnow() + m, y = utc.month, utc.year + if 4 <= m <= 10: + offset = -4 + elif m == 3: + first_day = datetime(y, 3, 1) + first_sun = first_day + timedelta(days=(6 - first_day.weekday()) % 7) + second_sun = first_sun + timedelta(days=7) + # 2am EST = 7am UTC + offset = -4 if utc >= second_sun.replace(hour=7) else -5 + elif m == 11: + first_day = datetime(y, 11, 1) + first_sun = first_day + timedelta(days=(6 - first_day.weekday()) % 7) + # 2am EDT = 6am UTC + offset = -5 if utc >= first_sun.replace(hour=6) else -4 + else: + offset = -5 + return utc + timedelta(hours=offset) + + def _is_market_open_et(self, et: datetime) -> bool: + if et.weekday() >= 5: + return False + mins = et.hour * 60 + et.minute + return 9 * 60 + 30 <= mins < 16 * 60 + + def _is_eod_window_et(self, et: datetime) -> bool: + if et.weekday() >= 5: + return False + mins = et.hour * 60 + et.minute + return 16 * 60 <= mins <= 16 * 60 + 10 + + # ------------------------------------------------------------------ + # API + # ------------------------------------------------------------------ + + def _fetch_quote_finnhub(self, ticker: str) -> dict | None: + try: + resp = requests.get( + f"{FINNHUB_BASE}/quote", + params={"symbol": ticker, "token": self.finnhub_key}, + timeout=10, + ) + if resp.status_code == 200: + d = resp.json() + price = d.get("c", 0) + if price: + return { + "price": float(price), + "change_pct": float(d.get("dp", 0)), + "prev_close": float(d.get("pc", 0)), + "high": float(d.get("h", 0)), + "low": float(d.get("l", 0)), + } + return None + except Exception as e: + self.worker.editor_logging_handler.error( + f"[PortfolioMonitor] Finnhub error for {ticker}: {e}" + ) + return None + + def _fetch_quote_av(self, ticker: str) -> dict | None: + if not self.av_key: + return None + try: + resp = requests.get( + AV_BASE, + params={"function": "GLOBAL_QUOTE", "symbol": ticker, "apikey": self.av_key}, + timeout=10, + ) + if resp.status_code == 200: + gq = resp.json().get("Global Quote", {}) + price = float(gq.get("05. price", 0)) + if price: + raw_pct = gq.get("10. change percent", "0%").replace("%", "") + return { + "price": price, + "change_pct": float(raw_pct), + "prev_close": float(gq.get("08. previous close", 0)), + "high": float(gq.get("03. high", 0)), + "low": float(gq.get("04. low", 0)), + } + return None + except Exception as e: + self.worker.editor_logging_handler.error( + f"[PortfolioMonitor] AV error for {ticker}: {e}" + ) + return None + + def _fetch_quote(self, ticker: str) -> dict | None: + quote = self._fetch_quote_finnhub(ticker) + if quote: + return quote + return self._fetch_quote_av(ticker) + + def _is_cache_fresh(self, ticker: str, data: dict) -> bool: + entry = data.get("price_cache", {}).get(ticker) + if not entry: + return False + try: + cached_at = datetime.strptime(entry["cached_at"], "%Y-%m-%dT%H:%M:%S") + return (datetime.utcnow() - cached_at).total_seconds() < CACHE_TTL_SECONDS + except Exception: + return False + + # ------------------------------------------------------------------ + # Alert logic + # ------------------------------------------------------------------ + + def _check_alerts(self, ticker: str, holding: dict, quote: dict, data: dict) -> list: + thresholds = data.get("alert_thresholds", {}).get(ticker, {}) + drop_pct = thresholds.get("drop_pct") + rise_pct = thresholds.get("rise_pct") + if not drop_pct and not rise_pct: + return [] + + change_pct = quote.get("change_pct", 0) + price = quote.get("price", 0) + shares = holding.get("shares", 0) + avg_cost = holding.get("avg_cost", 0) + name = holding.get("name", ticker) + + alerts = [] + + if drop_pct and change_pct <= -drop_pct: + pnl = (price - avg_cost) * shares + pnl_str = f"down ${abs(pnl):,.0f}" if pnl < 0 else f"up ${pnl:,.0f}" + msg = ( + f"Heads up — {name} is down {abs(change_pct):.0f} percent today. " + f"Your position is {pnl_str}. Say 'portfolio monitor' to review." + ) + alerts.append(("drop", msg)) + + if rise_pct and change_pct >= rise_pct: + pnl = (price - avg_cost) * shares + pnl_str = f"up ${pnl:,.0f}" if pnl >= 0 else f"down ${abs(pnl):,.0f}" + msg = ( + f"Nice — {name} is up {change_pct:.0f} percent today. " + f"Your position is {pnl_str}. Say 'portfolio monitor' to review." + ) + alerts.append(("rise", msg)) + + return alerts + + # ------------------------------------------------------------------ + # Proactive voice + # ------------------------------------------------------------------ + + async def _speak_morning_open(self, data: dict): + holdings = data.get("holdings", []) + count = len(holdings) + names = ", ".join(h.get("name", h["ticker"]) for h in holdings[:3]) + suffix = f" and {count - 3} more" if count > 3 else "" + try: + await self.capability_worker.send_interrupt_signal() + await self.capability_worker.speak( + f"Market just opened. You're tracking {count} " + f"{'stock' if count == 1 else 'stocks'}: {names}{suffix}. " + "Say 'portfolio monitor' for an update." + ) + except Exception as e: + self.worker.editor_logging_handler.error( + f"[PortfolioMonitor] Morning open error: {e}" + ) + + async def _speak_eod_summary(self, data: dict): + holdings = data.get("holdings", []) + if not holdings: + return + + # Force fresh quotes at close so EOD summary reflects final prices + cache = data.get("price_cache", {}) + changed = False + for h in holdings: + ticker = h["ticker"] + quote = self._fetch_quote(ticker) + if quote: + cache[ticker] = { + **quote, + "cached_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"), + } + changed = True + if changed: + data["price_cache"] = cache + self._save_data(data) + + total_value = 0.0 + total_cost = 0.0 + day_pnl = 0.0 + best = None + best_pct = -float("inf") + worst = None + worst_pct = float("inf") + + for h in holdings: + ticker = h["ticker"] + q = cache.get(ticker) + if not q: + continue + price = q["price"] + prev_close = q.get("prev_close", price) + shares = h.get("shares", 0) + avg_cost = h.get("avg_cost", 0) + change_pct = q.get("change_pct", 0) + + total_value += price * shares + total_cost += avg_cost * shares + day_pnl += (price - prev_close) * shares + + if change_pct > best_pct: + best_pct = change_pct + best = h.get("name", ticker) + if change_pct < worst_pct: + worst_pct = change_pct + worst = h.get("name", ticker) + + if not total_value: + return + + day_dir = "up" if day_pnl >= 0 else "down" + overall_pnl = total_value - total_cost + overall_dir = "up" if overall_pnl >= 0 else "down" + + parts = [ + f"Market's closed. Your portfolio ended at ${total_value:,.0f}, " + f"{day_dir} ${abs(day_pnl):,.0f} today, " + f"{overall_dir} ${abs(overall_pnl):,.0f} overall." + ] + if best and best_pct > 0: + parts.append(f"{best} led, up {best_pct:.0f} percent.") + if worst and worst_pct < 0: + parts.append(f"{worst} lagged, down {abs(worst_pct):.0f} percent.") + + try: + await self.capability_worker.send_interrupt_signal() + await self.capability_worker.speak(" ".join(parts)) + except Exception as e: + self.worker.editor_logging_handler.error( + f"[PortfolioMonitor] EOD summary error: {e}" + ) + + # ------------------------------------------------------------------ + # Main daemon loop + # ------------------------------------------------------------------ + + async def watch_loop(self): + self.finnhub_key = self.capability_worker.get_api_keys("finnhub_api_key") or "" + self.av_key = self.capability_worker.get_api_keys("alphavantage_api_key") or "" + + if not self.finnhub_key: + self.worker.editor_logging_handler.warning( + "[PortfolioMonitor] No Finnhub API key — daemon idle until key is configured." + ) + + s = _new_state() + self.worker.editor_logging_handler.info("[PortfolioMonitor] daemon started") + self.capability_worker.resume_normal_flow() + + while True: + sleep_time = POLL_MARKET_CLOSED + try: + if not self.finnhub_key: + await self.worker.session_tasks.sleep(POLL_NO_HOLDINGS) + continue + + data = self._load_data() + holdings = data.get("holdings", []) + + self.worker.editor_logging_handler.info( + f"[PortfolioMonitor] daemon tick — {len(holdings)} holding(s)" + ) + if not holdings: + await self.worker.session_tasks.sleep(POLL_NO_HOLDINGS) + continue + + et = self._et_now() + today_str = et.strftime("%Y-%m-%d") + + # Reset daily state + if today_str != s["current_day"]: + s["current_day"] = today_str + s["open_notified"] = False + s["eod_spoken"] = False + data["alerted_today"] = [] + meta = data.setdefault("meta", {}) + meta["api_calls_today"] = 0 + meta["api_calls_date"] = today_str + self._save_data(data) + + market_open = self._is_market_open_et(et) + + # Morning open notification + if market_open and not s["open_notified"]: + s["open_notified"] = True + await self._speak_morning_open(data) + + # EOD summary + if self._is_eod_window_et(et) and not s["eod_spoken"]: + await self._speak_eod_summary(data) + s["eod_spoken"] = True + + # Price polling during market hours + if market_open: + sleep_time = POLL_MARKET_OPEN + calls_this_poll = 0 + changed = False + pending_alerts = [] + + for holding in holdings: + if calls_this_poll >= MAX_API_CALLS_PER_POLL: + break + ticker = holding["ticker"] + if self._is_cache_fresh(ticker, data): + continue + + quote = self._fetch_quote(ticker) + calls_this_poll += 1 + if not quote: + continue + + data.setdefault("price_cache", {})[ticker] = { + **quote, + "cached_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"), + } + data.setdefault("meta", {}) + data["meta"]["api_calls_today"] = ( + data["meta"].get("api_calls_today", 0) + 1 + ) + changed = True + + self.worker.editor_logging_handler.info( + f"[PortfolioMonitor] {ticker}: ${quote['price']:.2f} " + f"({quote['change_pct']:+.1f}%)" + ) + + alerts = self._check_alerts(ticker, holding, quote, data) + for direction, msg in alerts: + alert_key = f"{ticker}_{direction}" + if alert_key not in data.get("alerted_today", []): + data.setdefault("alerted_today", []).append(alert_key) + pending_alerts.append(msg) + changed = True + + if changed: + self._save_data(data) + + for msg in pending_alerts: + try: + await self.capability_worker.send_interrupt_signal() + await self.capability_worker.speak(msg) + except Exception as e: + self.worker.editor_logging_handler.error( + f"[PortfolioMonitor] Alert error: {e}" + ) + + except Exception as e: + self.worker.editor_logging_handler.error( + f"[PortfolioMonitor] Loop error: {e}" + ) + + await self.worker.session_tasks.sleep(sleep_time) + + # ------------------------------------------------------------------ + # Entry point + # ------------------------------------------------------------------ + + def call(self, worker: AgentWorker, background_daemon_mode: bool): + self.worker = worker + self.capability_worker = CapabilityWorker(self.worker) + self.background_daemon_mode = background_daemon_mode + self.worker.session_tasks.create(self.watch_loop()) diff --git a/community/portfolio-monitor/main.py b/community/portfolio-monitor/main.py new file mode 100644 index 00000000..d08129e5 --- /dev/null +++ b/community/portfolio-monitor/main.py @@ -0,0 +1,1187 @@ +import re +import requests +from datetime import datetime + +from src.agent.capability import MatchingCapability +from src.agent.capability_worker import CapabilityWorker +from src.main import AgentWorker + +FINNHUB_BASE = "https://finnhub.io/api/v1" +AV_BASE = "https://www.alphavantage.co/query" +STORAGE_KEY = "portfolio_data" +CACHE_TTL_SECONDS = 180 + +HOTWORDS = { + "portfolio monitor", "my portfolio", "check my stocks", "stock update", + "portfolio update", "my stocks", "stock alert", "stock check", + "add to portfolio", "add a stock", "log a stock", "track a stock", + "remove from portfolio", "remove a stock", + "set stock alert", "set a stock alert", "stock price alert", + "what's moving", "biggest movers", "gainers today", "losers today", + "clear my portfolio", "wipe my portfolio", + "compare my stocks", "compare my portfolio", "day over day", + "how did my stocks do", "stock comparison", "versus yesterday", + "update my position", "bought more", "i sold", "add more shares", + "change my position", "update my stock", "sold some", + "how are the markets", "market update", "market pulse", + "what's the market doing", "market today", "market check", + "check apple", "check tesla", "check nvidia", "check amazon", + "check microsoft", "check google", "check meta", "check netflix", + "how's apple", "how's tesla", "how's nvidia", "how's amazon", + "how's microsoft", "how's google", "how's meta", "how's netflix", +} + +_MARKET_INDICES = [ + ("SPY", "S&P 500"), + ("QQQ", "Nasdaq"), + ("DIA", "Dow Jones"), +] + +TICKER_MAP = { + "apple": "AAPL", + "tesla": "TSLA", + "microsoft": "MSFT", + "amazon": "AMZN", + "google": "GOOGL", + "alphabet": "GOOGL", + "nvidia": "NVDA", + "meta": "META", + "facebook": "META", + "netflix": "NFLX", + "disney": "DIS", + "salesforce": "CRM", + "visa": "V", + "mastercard": "MA", + "jpmorgan": "JPM", + "jp morgan": "JPM", + "johnson": "JNJ", + "walmart": "WMT", + "exxon": "XOM", + "chevron": "CVX", + "unitedhealth": "UNH", + "home depot": "HD", + "adobe": "ADBE", + "paypal": "PYPL", + "intel": "INTC", + "amd": "AMD", + "qualcomm": "QCOM", + "broadcom": "AVGO", + "uber": "UBER", + "lyft": "LYFT", + "airbnb": "ABNB", + "spotify": "SPOT", + "shopify": "SHOP", + "palantir": "PLTR", + "coinbase": "COIN", + "robinhood": "HOOD", + "snapchat": "SNAP", + "snap": "SNAP", + "twitter": "X", + "berkshire": "BRK.B", + "boeing": "BA", + "ford": "F", + "general motors": "GM", + "gm": "GM", + "bank of america": "BAC", + "wells fargo": "WFC", + "citigroup": "C", + "goldman sachs": "GS", + "morgan stanley": "MS", +} + +COMMON_WORDS = { + "A", "I", "AN", "AS", "AT", "BE", "BY", "DO", "GO", "HE", "IF", "IN", + "IS", "IT", "ME", "MY", "NO", "OF", "ON", "OR", "SO", "TO", "UP", "US", + "WE", "ADD", "AND", "ARE", "BUT", "CAN", "FOR", "GET", "GOT", "HAD", + "HAS", "HIM", "HIS", "HOW", "LET", "NOT", "NOW", "OFF", "OUT", "OWN", + "PUT", "SAY", "SEE", "SET", "THE", "TOO", "TWO", "USE", "WAS", "WHO", + "WHY", "YES", "YET", "YOU", "ALL", "NEW", "OLD", "OUR", "TOP", +} + +_EXIT_PATTERN = re.compile( + r'\b(stop|exit|quit|done|cancel|bye|goodbye|never\s*mind|no\s*thanks|' + r"that'?s\s*all|nothing|nah|skip)\b", + re.IGNORECASE, +) + +_ADD_COMMAND_PHRASES = { + "add a stock", "add to portfolio", "log a stock", "track a stock", + "add stock", "new stock", "add another", "add another stock", +} + +_AFFIRMATIVE_PATTERN = re.compile( + r'\b(yes|yeah|sure|yep|absolutely|ok|okay|go ahead)\b', + re.IGNORECASE, +) + +_VALID_INTENTS = frozenset({ + "PORTFOLIO", "CHECK", "COMPARE", "MOVERS", "ADD", "UPDATE", + "SET_ALERT", "REMOVE", "CLEAR", "MARKET" +}) + +_HUB_ACTIONS = frozenset({"BREAKDOWN", "COMPARE", "MOVERS", "MARKET", "CHECK", "UNKNOWN"}) + + +def _empty_data() -> dict: + return { + "holdings": [], + "alert_thresholds": {}, + "price_cache": {}, + "alerted_today": [], + "meta": { + "api_calls_today": 0, + "api_calls_date": "", + "last_eod_summary": "", + }, + } + + +class PortfolioMonitorCapability(MatchingCapability): + worker: AgentWorker = None + capability_worker: CapabilityWorker = None + finnhub_key: str = "" + av_key: str = "" + + # Do not change following tag of register capability + # {{register capability}} + + # ------------------------------------------------------------------ + # Hotword matching + # ------------------------------------------------------------------ + + def does_match(self, text: str) -> bool: + t = text.lower().strip() + if any(hw in t for hw in HOTWORDS): + return True + # "check [company/ticker]" or "how's [company/ticker] doing" + if re.search(r'\b(check|how.?s|how is)\b', t) and "portfolio" not in t: + return bool(self._resolve_ticker_cheap(text)) + # "add Apple" / "add NVDA" — static map + ticker pattern, no LLM + if re.search(r'\badd\b', t) and "portfolio" not in t: + for company in TICKER_MAP: + if company in t: + return True + for match in re.finditer(r'\b([A-Z]{2,5})\b', text.upper()): + if match.group(1) not in COMMON_WORDS: + return True + return False + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _is_exit(self, text: str) -> bool: + if not text or not text.strip(): + return True + stripped = text.strip().rstrip(".,!?").strip().lower() + if stripped in ("no", "skip"): + return True + return bool(_EXIT_PATTERN.search(text)) + + def _classify_intent(self, text: str) -> str: + t = text.lower() + + # Cheap pre-filter for unambiguous destructive actions only + if any(kw in t for kw in ("clear", "wipe", "reset")) and any( + kw in t for kw in ("portfolio", "stocks", "holdings", "all") + ): + return "CLEAR" + if re.search(r'\b(remove|delete)\b', t) or ( + re.search(r'\bdrop\b', t) and any( + kw in t for kw in ("portfolio", "holding", "position", "from my", "from the") + ) + ): + return "REMOVE" + + try: + raw = self.capability_worker.text_to_text_response( + "Route this request for a stock portfolio voice assistant.\n" + "Pick exactly one intent:\n" + "PORTFOLIO — view overall portfolio value and P&L\n" + "CHECK — price or status of a specific stock\n" + "COMPARE — day-over-day: how each stock moved vs yesterday's close\n" + "MOVERS — biggest gainer and loser in the portfolio today\n" + "ADD — add a new stock to the portfolio\n" + "UPDATE — modify an existing position: bought more shares, sold some, or correct avg cost\n" + "SET_ALERT — set a price drop or rise alert for a stock\n" + "REMOVE — remove a stock from the portfolio\n" + "CLEAR — wipe the entire portfolio\n" + "MARKET — broad market overview: S&P 500, Nasdaq, Dow Jones\n\n" + "Reply with ONLY the intent label.\n" + f"User input: {text.strip() or '(portfolio update)'}" + ) + intent = raw.strip().upper().split()[0].strip(".,") + return intent if intent in _VALID_INTENTS else "PORTFOLIO" + except Exception: + return "PORTFOLIO" + + def _classify_hub_action(self, text: str) -> str: + try: + raw = self.capability_worker.text_to_text_response( + "The user is inside a portfolio dashboard. Route their request:\n" + "BREAKDOWN — full per-stock detail list\n" + "COMPARE — day-over-day comparison vs yesterday's close\n" + "MOVERS — biggest gainer and loser today\n" + "MARKET — broad market indices (S&P 500, Nasdaq, Dow)\n" + "CHECK — specific stock price or status\n" + "UNKNOWN — none of the above\n\n" + "Reply with ONLY the label.\n" + f"User input: {text}" + ) + result = raw.strip().upper().split()[0].strip(".,") + return result if result in _HUB_ACTIONS else "UNKNOWN" + except Exception: + return "UNKNOWN" + + def _resolve_ticker_cheap(self, text: str) -> str | None: + """Static-only ticker resolution — safe to call from does_match (no LLM).""" + if not text: + return None + lower = text.lower() + for company, ticker in sorted(TICKER_MAP.items(), key=lambda x: -len(x[0])): + if company in lower: + return ticker + for match in re.finditer(r'\b([A-Z]{2,5})\b', text.upper()): + candidate = match.group(1) + if candidate not in COMMON_WORDS: + return candidate + return None + + def _resolve_ticker(self, text: str) -> str | None: + result = self._resolve_ticker_cheap(text) + if result: + return result + return self._resolve_ticker_llm(text) + + def _resolve_ticker_llm(self, text: str) -> str | None: + try: + raw = self.capability_worker.text_to_text_response( + "Extract the US stock ticker symbol from this text. " + "Return ONLY the ticker (e.g. AAPL, TSLA) or 'NONE' if not found.\n" + f"Text: {text}" + ) + result = raw.strip().upper().split()[0].strip(".,") if raw.strip() else "NONE" + return None if result == "NONE" else result + except Exception: + return None + + def _resolve_company_name(self, ticker: str) -> str: + for company, t in TICKER_MAP.items(): + if t == ticker: + return company.title() + try: + raw = self.capability_worker.text_to_text_response( + f"What company does the US stock ticker {ticker} represent? " + "Reply with ONLY the company name, nothing else." + ) + return raw.strip() or ticker + except Exception: + return ticker + + # ------------------------------------------------------------------ + # API + # ------------------------------------------------------------------ + + def _fetch_quote_finnhub(self, ticker: str) -> dict | None: + try: + resp = requests.get( + f"{FINNHUB_BASE}/quote", + params={"symbol": ticker, "token": self.finnhub_key}, + timeout=10, + ) + if resp.status_code == 200: + d = resp.json() + price = d.get("c", 0) + if price: + return { + "price": float(price), + "change_pct": float(d.get("dp", 0)), + "prev_close": float(d.get("pc", 0)), + "high": float(d.get("h", 0)), + "low": float(d.get("l", 0)), + } + return None + except Exception as e: + self.worker.editor_logging_handler.error( + f"[PortfolioMonitor] Finnhub error for {ticker}: {e}" + ) + return None + + def _fetch_quote_av(self, ticker: str) -> dict | None: + if not self.av_key: + return None + try: + resp = requests.get( + AV_BASE, + params={"function": "GLOBAL_QUOTE", "symbol": ticker, "apikey": self.av_key}, + timeout=10, + ) + if resp.status_code == 200: + gq = resp.json().get("Global Quote", {}) + price = float(gq.get("05. price", 0)) + if price: + raw_pct = gq.get("10. change percent", "0%").replace("%", "") + return { + "price": price, + "change_pct": float(raw_pct), + "prev_close": float(gq.get("08. previous close", 0)), + "high": float(gq.get("03. high", 0)), + "low": float(gq.get("04. low", 0)), + } + return None + except Exception as e: + self.worker.editor_logging_handler.error( + f"[PortfolioMonitor] AV error for {ticker}: {e}" + ) + return None + + def _fetch_quote(self, ticker: str) -> dict | None: + quote = self._fetch_quote_finnhub(ticker) + if quote: + return quote + return self._fetch_quote_av(ticker) + + # ------------------------------------------------------------------ + # Context Storage + # ------------------------------------------------------------------ + + def _load_data(self) -> dict: + try: + result = self.capability_worker.get_single_key(STORAGE_KEY) + if result and result.get("value"): + return result["value"] + return _empty_data() + except Exception as e: + self.worker.editor_logging_handler.error(f"[PortfolioMonitor] Load error: {e}") + return _empty_data() + + def _save_data(self, data: dict): + try: + self.capability_worker.update_key(STORAGE_KEY, data) + except Exception: + try: + self.capability_worker.create_key(STORAGE_KEY, data) + except Exception as e: + self.worker.editor_logging_handler.error(f"[PortfolioMonitor] Save error: {e}") + + # ------------------------------------------------------------------ + # Intent handlers + # ------------------------------------------------------------------ + + async def _handle_portfolio(self): + data = self._load_data() + holdings = data.get("holdings", []) + if not holdings: + await self.capability_worker.speak( + "No stocks in your portfolio yet. Say 'add a stock' to start tracking." + ) + return + + cache = data.get("price_cache", {}) + changed = False + + # Pre-scan for stale tickers; announce once if multiple need fetching + stale_tickers = [] + for h in holdings: + ticker = h["ticker"] + q = cache.get(ticker) + is_stale = True + if q: + try: + cached_at = datetime.strptime(q["cached_at"], "%Y-%m-%dT%H:%M:%S") + is_stale = (datetime.utcnow() - cached_at).total_seconds() > CACHE_TTL_SECONDS + except Exception: + pass + if is_stale: + stale_tickers.append(h["ticker"]) + + if len(stale_tickers) > 1: + await self.capability_worker.speak("Fetching latest prices...") + elif len(stale_tickers) == 1: + solo = next(h for h in holdings if h["ticker"] == stale_tickers[0]) + await self.capability_worker.speak(f"Fetching {solo.get('name', stale_tickers[0])}...") + + for ticker in stale_tickers: + quote = self._fetch_quote(ticker) + if quote: + cache[ticker] = { + **quote, + "cached_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"), + } + changed = True + + if changed: + data["price_cache"] = cache + self._save_data(data) + + # Compute totals for snapshot + total_value = 0.0 + total_cost = 0.0 + day_pnl = 0.0 + for h in holdings: + q = cache.get(h["ticker"]) + if not q: + continue + price = q["price"] + prev_close = q.get("prev_close", price) + shares = h.get("shares", 0) + avg_cost = h.get("avg_cost", 0) + total_value += price * shares + total_cost += avg_cost * shares + day_pnl += (price - prev_close) * shares + + total_pnl = total_value - total_cost + total_pnl_pct = (total_pnl / total_cost * 100) if total_cost else 0 + day_dir = "up" if day_pnl >= 0 else "down" + overall_dir = "up" if total_pnl >= 0 else "down" + + # Snapshot — tight two lines, then navigation prompt + await self.capability_worker.speak( + f"Portfolio at ${total_value:,.0f} — {day_dir} ${abs(day_pnl):,.0f} today, " + f"{overall_dir} ${abs(total_pnl):,.0f} ({abs(total_pnl_pct):.0f}%) overall." + ) + await self.capability_worker.speak( + "Say 'breakdown' for full detail, 'compare' for day-over-day, " + "'movers' for biggest movers, 'market' for market indices, or a stock name." + ) + + while True: + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + break + action = self._classify_hub_action(reply) + if action == "BREAKDOWN": + await self._speak_portfolio_breakdown(data) + elif action == "COMPARE": + await self._handle_compare(data) + elif action == "MOVERS": + await self._handle_movers(data) + elif action == "MARKET": + await self._handle_market() + elif action == "CHECK": + ticker = self._resolve_ticker(reply) + if ticker: + await self._speak_single_stock(ticker, data) + else: + await self.capability_worker.speak( + "Which stock? Say a company name or ticker symbol." + ) + else: + ticker = self._resolve_ticker(reply) + if ticker: + await self._speak_single_stock(ticker, data) + else: + await self.capability_worker.speak( + "Say 'breakdown', 'compare', 'movers', 'market', a stock name, or stop." + ) + + async def _speak_portfolio_breakdown(self, data: dict): + holdings = data.get("holdings", []) + cache = data.get("price_cache", {}) + stock_lines = [] + + for h in holdings: + ticker = h["ticker"] + name = h.get("name", ticker) + shares = h.get("shares", 0) + avg_cost = h.get("avg_cost", 0) + q = cache.get(ticker) + + if not q: + stock_lines.append(f"{name}: no data available") + continue + + price = q["price"] + change_pct = q.get("change_pct", 0) + position_value = price * shares + position_cost = avg_cost * shares + position_pnl = position_value - position_cost + position_pnl_pct = (position_pnl / position_cost * 100) if position_cost else 0 + + day_dir = "up" if change_pct >= 0 else "down" + pos_dir = "up" if position_pnl >= 0 else "down" + stock_lines.append( + f"{name}: ${price:,.0f}, {day_dir} {abs(change_pct):.0f}% today, " + f"{pos_dir} ${abs(position_pnl):,.0f} ({abs(position_pnl_pct):.0f}%) overall" + ) + + if not stock_lines: + return + chunk_size = 4 + for i in range(0, len(stock_lines), chunk_size): + chunk = stock_lines[i:i + chunk_size] + await self.capability_worker.speak(". ".join(chunk) + ".") + if i + chunk_size < len(stock_lines): + await self.capability_worker.speak("Want to hear the rest?") + reply = await self.capability_worker.user_response() + if self._is_exit(reply) or not _AFFIRMATIVE_PATTERN.search(reply or ""): + break + + async def _handle_compare(self, data: dict | None = None): + if data is None: + data = self._load_data() + holdings = data.get("holdings", []) + if not holdings: + await self.capability_worker.speak("No stocks in your portfolio yet.") + return + + cache = data.get("price_cache", {}) + lines = [] + no_data = [] + + for h in holdings: + ticker = h["ticker"] + name = h.get("name", ticker) + q = cache.get(ticker) + if not q or not q.get("prev_close"): + no_data.append(name) + continue + + price = q["price"] + prev_close = q["prev_close"] + change_pct = q.get("change_pct", 0) + day_dollar = abs(price - prev_close) + direction = "up" if change_pct >= 0 else "down" + pct_str = f"{abs(change_pct):.0f}" if abs(change_pct) >= 1 else "less than 1" + lines.append(f"{name} {direction} {pct_str}% (${day_dollar:,.0f})") + + if not lines: + await self.capability_worker.speak( + "No comparison data yet — say 'my portfolio' first to fetch current prices." + ) + return + + chunk_size = 4 + for i in range(0, len(lines), chunk_size): + chunk = lines[i:i + chunk_size] + prefix = "Today versus yesterday: " if i == 0 else "Continuing: " + await self.capability_worker.speak(prefix + ", ".join(chunk) + ".") + if i + chunk_size < len(lines): + await self.capability_worker.speak("Want to hear the rest?") + reply = await self.capability_worker.user_response() + if self._is_exit(reply) or not _AFFIRMATIVE_PATTERN.search(reply or ""): + break + if no_data: + await self.capability_worker.speak(f"No data for: {', '.join(no_data)}.") + + async def _speak_single_stock(self, ticker: str, data: dict): + cache = data.get("price_cache", {}) + q = cache.get(ticker) + if not q: + await self.capability_worker.speak(f"Fetching {ticker}...") + q = self._fetch_quote(ticker) + if q: + data.setdefault("price_cache", {})[ticker] = { + **q, + "cached_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"), + } + self._save_data(data) + + if not q: + await self.capability_worker.speak( + f"Couldn't get data for {ticker} right now. Try again in a moment." + ) + return + + price = q["price"] + change_pct = q.get("change_pct", 0) + day_dir = "up" if change_pct >= 0 else "down" + pct_str = f"{abs(change_pct):.0f}" if abs(change_pct) >= 1 else "less than 1" + + holding = next( + (h for h in data.get("holdings", []) if h["ticker"] == ticker), None + ) + name = holding.get("name", ticker) if holding else ( + next((c.title() for c, t in TICKER_MAP.items() if t == ticker), ticker) + ) + + msg = f"{name} is at ${price:,.0f}, {day_dir} {pct_str} percent today." + + if holding: + shares = holding["shares"] + avg_cost = holding["avg_cost"] + pos_value = price * shares + pos_cost = avg_cost * shares + pos_pnl = pos_value - pos_cost + pos_pnl_pct = (pos_pnl / pos_cost * 100) if pos_cost else 0 + pos_dir = "up" if pos_pnl >= 0 else "down" + msg += ( + f" Your {shares:g} shares are worth ${pos_value:,.0f} — " + f"{pos_dir} ${abs(pos_pnl):,.0f} ({abs(pos_pnl_pct):.0f}%) on your position." + ) + else: + msg += " Not in your portfolio — say 'add a stock' to track it." + + await self.capability_worker.speak(msg) + + async def _handle_check(self, trigger_text: str): + ticker = self._resolve_ticker(trigger_text) + + if not ticker: + await self.capability_worker.speak( + "Which stock should I check? Say the company name or ticker symbol." + ) + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + return + ticker = self._resolve_ticker(reply) + + if not ticker: + await self.capability_worker.speak( + "I couldn't identify that stock. Try using the ticker symbol like AAPL or TSLA." + ) + return + + while ticker: + data = self._load_data() + + cached = data.get("price_cache", {}).get(ticker) + is_fresh = False + if cached: + try: + cached_at = datetime.strptime(cached["cached_at"], "%Y-%m-%dT%H:%M:%S") + is_fresh = (datetime.utcnow() - cached_at).total_seconds() < CACHE_TTL_SECONDS + except Exception: + pass + + if not is_fresh: + await self.capability_worker.speak(f"Checking {ticker}...") + quote = self._fetch_quote(ticker) + if not quote: + await self.capability_worker.speak( + f"Couldn't get data for {ticker} right now. Try again in a moment." + ) + return + data.setdefault("price_cache", {})[ticker] = { + **quote, + "cached_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"), + } + self._save_data(data) + + await self._speak_single_stock(ticker, data) + + await self.capability_worker.speak( + "Want to check another stock? Say a name or stop." + ) + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + break + ticker = self._resolve_ticker(reply) + if not ticker: + await self.capability_worker.speak( + "I couldn't identify that stock. Try using the ticker symbol like AAPL or TSLA." + ) + break + + async def _handle_movers(self, data: dict | None = None): + if data is None: + data = self._load_data() + holdings = data.get("holdings", []) + if not holdings: + await self.capability_worker.speak("No stocks in your portfolio yet.") + return + + cache = data.get("price_cache", {}) + movers = [] + for h in holdings: + ticker = h["ticker"] + q = cache.get(ticker) + if q: + movers.append((h.get("name", ticker), q.get("change_pct", 0))) + + if not movers: + await self.capability_worker.speak( + "No price data cached yet — say 'my portfolio' to fetch current prices." + ) + return + + movers.sort(key=lambda x: x[1]) + worst = movers[0] + best = movers[-1] + + parts = [] + if best[1] > 0: + parts.append(f"Biggest gainer: {best[0]}, up {best[1]:.0f} percent.") + if worst[1] < 0: + parts.append(f"Biggest loser: {worst[0]}, down {abs(worst[1]):.0f} percent.") + + if not parts: + await self.capability_worker.speak( + "Everything's flat today — no significant movers in your portfolio." + ) + return + + await self.capability_worker.speak(" ".join(parts)) + + async def _handle_market(self): + await self.capability_worker.speak("Checking market indices...") + parts = [] + for ticker, label in _MARKET_INDICES: + quote = self._fetch_quote(ticker) + if quote: + change_pct = quote.get("change_pct", 0) + direction = "up" if change_pct >= 0 else "down" + pct_str = f"{abs(change_pct):.1f}" if abs(change_pct) >= 0.1 else "flat" + parts.append(f"{label} {direction} {pct_str}%") + if not parts: + await self.capability_worker.speak( + "Couldn't fetch market data right now. Try again in a moment." + ) + return + await self.capability_worker.speak("Markets today: " + ", ".join(parts) + ".") + + async def _handle_update(self, trigger_text: str, ticker: str | None = None): + if ticker is None: + ticker = self._resolve_ticker(trigger_text) + if not ticker: + await self.capability_worker.speak("Which stock do you want to update?") + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + return + ticker = self._resolve_ticker(reply) + if not ticker: + await self.capability_worker.speak( + "I couldn't identify that stock. Try using the ticker symbol." + ) + return + + data = self._load_data() + holding = next((h for h in data.get("holdings", []) if h["ticker"] == ticker), None) + if not holding: + name = self._resolve_company_name(ticker) + await self.capability_worker.speak( + f"{name} isn't in your portfolio. Say 'add a stock' to add it." + ) + return + + name = holding.get("name", ticker) + shares = holding["shares"] + avg_cost = holding["avg_cost"] + await self.capability_worker.speak( + f"{name}: {shares:g} shares at ${avg_cost:,.0f} average, " + f"total cost ${shares * avg_cost:,.0f}. " + "Say 'bought more', 'sold some', or 'correct' to overwrite." + ) + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + return + r = reply.lower() + + if any(kw in r for kw in ("bought", "more", "buy", "added", "purchase")): + await self.capability_worker.speak( + "How many shares did you buy, and at what price? Say both — like '5 at 210'." + ) + details = await self.capability_worker.user_response() + if self._is_exit(details): + return + nums = [float(n.replace(",", "")) for n in re.findall(r'[\d,]+\.?\d*', details) if n] + if len(nums) < 2 or nums[0] <= 0 or nums[1] <= 0: + await self.capability_worker.speak( + "I need both the number of shares and the price." + ) + return + new_shares, new_price = nums[0], nums[1] + total_shares = shares + new_shares + new_avg = (shares * avg_cost + new_shares * new_price) / total_shares + holding["shares"] = total_shares + holding["avg_cost"] = round(new_avg, 2) + self._save_data(data) + await self.capability_worker.speak( + f"Updated — {name} now {total_shares:g} shares at ${new_avg:,.0f} average." + ) + + elif any(kw in r for kw in ("sold", "sell", "sale", "reduced")): + await self.capability_worker.speak("How many shares did you sell?") + details = await self.capability_worker.user_response() + if self._is_exit(details): + return + nums = [float(n.replace(",", "")) for n in re.findall(r'[\d,]+\.?\d*', details) if n] + if not nums or nums[0] <= 0: + await self.capability_worker.speak("I need to know how many shares you sold.") + return + sold = nums[0] + remaining = shares - sold + if remaining <= 0: + confirmed = await self.capability_worker.run_confirmation_loop( + f"That would leave zero shares. Remove {name} from your portfolio?" + ) + if confirmed: + data["holdings"] = [h for h in data["holdings"] if h["ticker"] != ticker] + data.get("alert_thresholds", {}).pop(ticker, None) + data.get("price_cache", {}).pop(ticker, None) + self._save_data(data) + await self.capability_worker.speak(f"Removed {name} from your portfolio.") + else: + holding["shares"] = remaining + self._save_data(data) + await self.capability_worker.speak( + f"Updated — {name} now {remaining:g} shares remaining." + ) + + elif any(kw in r for kw in ("correct", "overwrite", "set", "change", "update")): + await self.capability_worker.speak( + "What's the correct number of shares and average cost? " + "Say both — like '10 at 175'." + ) + details = await self.capability_worker.user_response() + if self._is_exit(details): + return + nums = [float(n.replace(",", "")) for n in re.findall(r'[\d,]+\.?\d*', details) if n] + if len(nums) < 2 or nums[0] <= 0 or nums[1] <= 0: + await self.capability_worker.speak( + "I need both the number of shares and the price." + ) + return + holding["shares"] = nums[0] + holding["avg_cost"] = nums[1] + self._save_data(data) + await self.capability_worker.speak( + f"Updated — {name} corrected to {nums[0]:g} shares at ${nums[1]:,.0f} average." + ) + + else: + await self.capability_worker.speak( + "Say 'bought more', 'sold some', or 'correct' to update the position." + ) + + async def _handle_add(self, trigger_text: str): + while True: + trigger_clean = trigger_text.lower().strip() + generic_trigger = not trigger_clean or trigger_clean in _ADD_COMMAND_PHRASES + + name = None # resolved early in non-generic branch; reused at save time + + if generic_trigger: + await self.capability_worker.speak( + "Which stock, how many shares, and at what price? " + "Say it all — like 'Apple 10 shares at 180'." + ) + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + return + + ticker = self._resolve_ticker(reply) + if not ticker: + await self.capability_worker.speak( + "I didn't catch the stock. Say 'add a stock' to try again." + ) + return + + nums = re.findall(r'[\d,]+\.?\d*', reply) + nums_clean = [] + for n in nums: + try: + nums_clean.append(float(n.replace(",", ""))) + except ValueError: + pass + + if len(nums_clean) < 2: + await self.capability_worker.speak( + "I need both the number of shares and the price. " + "Say 'add a stock' and include all three — like 'Apple 10 shares at 180'." + ) + return + + shares = nums_clean[0] + avg_cost = nums_clean[1] + + else: + ticker = self._resolve_ticker(trigger_text) + if not ticker: + await self.capability_worker.speak( + "Say 'add a stock' to add to your portfolio." + ) + return + + nums = re.findall(r'[\d,]+\.?\d*', trigger_text) + nums_clean = [] + for n in nums: + try: + nums_clean.append(float(n.replace(",", ""))) + except ValueError: + pass + + if len(nums_clean) >= 2: + shares = nums_clean[0] + avg_cost = nums_clean[1] + else: + name = self._resolve_company_name(ticker) + await self.capability_worker.speak( + f"Adding {name}. How many shares and at what price? " + "Say both — like '10 shares at 180'." + ) + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + return + + more_nums = re.findall(r'[\d,]+\.?\d*', reply) + more_clean = [] + for n in more_nums: + try: + more_clean.append(float(n.replace(",", ""))) + except ValueError: + pass + + if len(more_clean) < 2: + await self.capability_worker.speak( + "Say both the number of shares and the price — like '10 at 180'." + ) + return + + shares = more_clean[0] + avg_cost = more_clean[1] + + if shares <= 0 or avg_cost <= 0: + await self.capability_worker.speak( + "Shares and price must be greater than zero. Say 'add a stock' to try again." + ) + return + + data = self._load_data() + existing = next( + (h for h in data.get("holdings", []) if h["ticker"] == ticker), None + ) + if existing: + ex_name = existing.get("name", ticker) + await self.capability_worker.speak( + f"{ex_name} is already in your portfolio — " + f"{existing['shares']:g} shares at ${existing['avg_cost']:,.0f} average. " + "Want to update this position?" + ) + confirm = await self.capability_worker.user_response() + if _AFFIRMATIVE_PATTERN.search(confirm or ""): + await self._handle_update("", ticker=ticker) + return + else: + if name is None: + name = self._resolve_company_name(ticker) + holding = { + "id": str(int(datetime.now().timestamp() * 1000)), + "ticker": ticker, + "name": name, + "shares": shares, + "avg_cost": avg_cost, + "added_at": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), + } + data["holdings"].append(holding) + self._save_data(data) + await self.capability_worker.speak( + f"Added {shares:g} shares of {name} at ${avg_cost:,.0f} average. " + f"Total position: ${shares * avg_cost:,.0f}." + ) + + await self.capability_worker.speak( + "Want to add another stock? Say a stock name or stop." + ) + next_reply = await self.capability_worker.user_response() + if self._is_exit(next_reply): + return + trigger_text = "" if _AFFIRMATIVE_PATTERN.search(next_reply) else next_reply + + async def _handle_set_alert(self, trigger_text: str): + ticker = self._resolve_ticker(trigger_text) + + if not ticker: + await self.capability_worker.speak( + "Which stock do you want to set an alert for?" + ) + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + return + ticker = self._resolve_ticker(reply) + + if not ticker: + await self.capability_worker.speak("I couldn't identify that stock.") + return + + while ticker: + data = self._load_data() + holding = next( + (h for h in data.get("holdings", []) if h["ticker"] == ticker), None + ) + if not holding: + await self.capability_worker.speak( + f"{ticker} isn't in your portfolio — add it first." + ) + return + + name = holding.get("name", ticker) + + await self.capability_worker.speak( + f"Alert me if {name} drops how many percent in a day? Say a number or skip." + ) + drop_reply = await self.capability_worker.user_response() + drop_pct = None + if not self._is_exit(drop_reply) and "skip" not in drop_reply.lower(): + m = re.search(r'[\d.]+', drop_reply) + if m: + drop_pct = float(m.group()) + + await self.capability_worker.speak( + f"Alert me if {name} rises how many percent in a day? Say a number or skip." + ) + rise_reply = await self.capability_worker.user_response() + rise_pct = None + if not self._is_exit(rise_reply) and "skip" not in rise_reply.lower(): + m = re.search(r'[\d.]+', rise_reply) + if m: + rise_pct = float(m.group()) + + if drop_pct is None and rise_pct is None: + await self.capability_worker.speak("No thresholds set.") + else: + ticker_thresholds = data.setdefault("alert_thresholds", {}).setdefault(ticker, {}) + if drop_pct is not None: + ticker_thresholds["drop_pct"] = drop_pct + if rise_pct is not None: + ticker_thresholds["rise_pct"] = rise_pct + self._save_data(data) + + parts = [] + if drop_pct is not None: + parts.append(f"drops {drop_pct:.0f}%") + if rise_pct is not None: + parts.append(f"rises {rise_pct:.0f}%") + await self.capability_worker.speak( + f"Done — I'll alert you if {name} {' or '.join(parts)} in a day." + ) + + await self.capability_worker.speak( + "Want to set an alert for another stock? Say a stock name or stop." + ) + next_reply = await self.capability_worker.user_response() + if self._is_exit(next_reply): + return + ticker = self._resolve_ticker(next_reply) + if not ticker: + await self.capability_worker.speak("I couldn't identify that stock.") + return + + async def _handle_remove(self, trigger_text: str): + data = self._load_data() + holdings = data.get("holdings", []) + if not holdings: + await self.capability_worker.speak("Your portfolio is empty.") + return + + ticker = self._resolve_ticker(trigger_text) + + if not ticker: + await self.capability_worker.speak("Which stock do you want to remove?") + reply = await self.capability_worker.user_response() + if self._is_exit(reply): + return + ticker = self._resolve_ticker(reply) + + if not ticker: + await self.capability_worker.speak("I couldn't identify that stock.") + return + + holding = next((h for h in holdings if h["ticker"] == ticker), None) + if not holding: + await self.capability_worker.speak(f"{ticker} isn't in your portfolio.") + return + + name = holding.get("name", ticker) + confirmed = await self.capability_worker.run_confirmation_loop( + f"Remove {name} from your portfolio?" + ) + if confirmed: + data["holdings"] = [h for h in holdings if h["ticker"] != ticker] + data.get("alert_thresholds", {}).pop(ticker, None) + data.get("price_cache", {}).pop(ticker, None) + self._save_data(data) + await self.capability_worker.speak(f"Removed {name}.") + else: + await self.capability_worker.speak("Keeping it.") + + async def _handle_clear(self): + data = self._load_data() + count = len(data.get("holdings", [])) + if count == 0: + await self.capability_worker.speak("Portfolio is already empty.") + return + + confirmed = await self.capability_worker.run_confirmation_loop( + f"Clear all {count} {'stock' if count == 1 else 'stocks'} from your portfolio?" + ) + if confirmed: + data["holdings"] = [] + data["alert_thresholds"] = {} + data["price_cache"] = {} + data["alerted_today"] = [] + self._save_data(data) + await self.capability_worker.speak("Portfolio cleared.") + else: + await self.capability_worker.speak("Keeping everything.") + + # ------------------------------------------------------------------ + # Main run loop + # ------------------------------------------------------------------ + + async def _run(self): + try: + self.finnhub_key = self.capability_worker.get_api_keys("finnhub_api_key") or "" + self.av_key = self.capability_worker.get_api_keys("alphavantage_api_key") or "" + + if not self.finnhub_key: + await self.capability_worker.speak( + "Portfolio Monitor needs a Finnhub API key to work. " + "Add it in Settings under API Keys — get a free one at finnhub dot io." + ) + return + + trigger_text = await self.capability_worker.wait_for_complete_transcription() + if not trigger_text or not isinstance(trigger_text, str): + trigger_text = "" + + intent = self._classify_intent(trigger_text) + self.worker.editor_logging_handler.info( + f"[PortfolioMonitor] Intent: {intent} | Trigger: {trigger_text[:80]}" + ) + + if intent == "PORTFOLIO": + await self._handle_portfolio() + elif intent == "CHECK": + await self._handle_check(trigger_text) + elif intent == "COMPARE": + await self._handle_compare() + elif intent == "MOVERS": + await self._handle_movers() + elif intent == "ADD": + await self._handle_add(trigger_text) + elif intent == "UPDATE": + await self._handle_update(trigger_text) + elif intent == "SET_ALERT": + await self._handle_set_alert(trigger_text) + elif intent == "REMOVE": + await self._handle_remove(trigger_text) + elif intent == "CLEAR": + await self._handle_clear() + elif intent == "MARKET": + await self._handle_market() + else: + await self.capability_worker.speak( + "I can show your portfolio, check a stock, track movers, " + "or add a stock. What would you like?" + ) + + except Exception as e: + self.worker.editor_logging_handler.error(f"[PortfolioMonitor] Skill error: {e}") + try: + await self.capability_worker.speak( + "Something went wrong. Try asking again in a moment." + ) + except Exception: + pass + finally: + self.capability_worker.resume_normal_flow() + + # ------------------------------------------------------------------ + # Entry point + # ------------------------------------------------------------------ + + def call(self, worker: AgentWorker): + self.worker = worker + self.capability_worker = CapabilityWorker(self.worker) + self.worker.session_tasks.create(self._run())