diff --git a/Cargo.lock b/Cargo.lock index 11a00e0..258ce02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,9 +113,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bitflags" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" [[package]] name = "block-buffer" @@ -589,9 +589,9 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.8" +version = "0.27.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2b52f86d1d4bc0d6b4e6826d960b1b333217e07d36b882dca570a5e1c48895b" +checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" dependencies = [ "http", "hyper", @@ -829,9 +829,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.184" +version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" +checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" [[package]] name = "litemap" @@ -960,11 +960,13 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openab" -version = "0.7.4" +version = "0.7.6" dependencies = [ "anyhow", + "async-trait", "base64", "clap", + "futures-util", "image", "libc", "rand 0.8.5", @@ -975,6 +977,7 @@ dependencies = [ "serde_json", "serenity", "tokio", + "tokio-tungstenite", "toml", "tracing", "tracing-subscriber", @@ -1114,7 +1117,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand 0.9.3", + "rand 0.9.4", "ring", "rustc-hash", "rustls 0.23.38", @@ -1174,9 +1177,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ec095654a25171c2124e9e3393a930bddbffdc939556c914957a4c3e0a87166" +checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.5", @@ -1365,7 +1368,7 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.11", + "rustls-webpki 0.103.12", "subtle", "zeroize", ] @@ -1393,9 +1396,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.11" +version = "0.103.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20a6af516fea4b20eccceaf166e8aa666ac996208e8a644ce3ef5aa783bc7cd4" +checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" dependencies = [ "ring", "rustls-pki-types", @@ -1760,9 +1763,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.51.1" +version = "1.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" +checksum = "a91135f59b1cbf38c91e73cf3386fca9bb77915c45ce2771460c9d92f0f3d776" dependencies = [ "bytes", "libc", diff --git a/Cargo.toml b/Cargo.toml index 8687438..419bade 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,8 @@ serenity = { version = "0.12", default-features = false, features = ["client", " uuid = { version = "1", features = ["v4"] } regex = "1" anyhow = "1" +async-trait = "0.1" +futures-util = "0.3" rand = "0.8" clap = { version = "4", features = ["derive"] } rpassword = "7" @@ -22,3 +24,4 @@ base64 = "0.22" image = { version = "0.25", default-features = false, features = ["jpeg", "png", "gif", "webp"] } unicode-width = "0.2" libc = "0.2" +tokio-tungstenite = { version = "0.21", features = ["rustls-tls-webpki-roots"] } diff --git a/README.md b/README.md index 33c5c33..bf79e83 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,18 @@ ![OpenAB banner](images/banner.png) -A lightweight, secure, cloud-native ACP harness that bridges Discord and any [Agent Client Protocol](https://github.com/anthropics/agent-protocol)-compatible coding CLI (Kiro CLI, Claude Code, Codex, Gemini, OpenCode, Copilot CLI, etc.) over stdio JSON-RPC β€” delivering the next-generation development experience. +A lightweight, secure, cloud-native ACP harness that bridges **Discord, Slack**, and any [Agent Client Protocol](https://github.com/anthropics/agent-protocol)-compatible coding CLI (Kiro CLI, Claude Code, Codex, Gemini, OpenCode, Copilot CLI, etc.) over stdio JSON-RPC β€” delivering the next-generation development experience. πŸͺΌ **Join our community!** Come say hi on Discord β€” we'd love to have you: **[πŸͺΌ OpenAB β€” Official](https://discord.gg/YNksK9M6)** πŸŽ‰ ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” Gateway WS β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” ACP stdio β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -β”‚ Discord │◄─────────────►│ openab │──────────────►│ coding CLI β”‚ -β”‚ User β”‚ β”‚ (Rust) │◄── JSON-RPC ──│ (acp mode) β”‚ -β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +β”‚ Discord │◄─────────────►│ │──────────────►│ coding CLI β”‚ +β”‚ User β”‚ β”‚ openab │◄── JSON-RPC ──│ (acp mode) β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ Socket Mode β”‚ (Rust) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +β”‚ Slack │◄─────────────►│ β”‚ +β”‚ User β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ## Demo @@ -21,6 +24,7 @@ A lightweight, secure, cloud-native ACP harness that bridges Discord and any [Ag ## Features +- **Multi-platform** β€” supports Discord and Slack, run one or both simultaneously - **Pluggable agent backend** β€” swap between Kiro CLI, Claude Code, Codex, Gemini, OpenCode, Copilot CLI via config - **@mention trigger** β€” mention the bot in an allowed channel to start a conversation - **Thread-based multi-turn** β€” auto-creates threads; no @mention needed for follow-ups @@ -33,10 +37,22 @@ A lightweight, secure, cloud-native ACP harness that bridges Discord and any [Ag ## Quick Start -### 1. Create a Discord Bot +### 1. Create a Bot + +
+Discord See [docs/discord-bot-howto.md](docs/discord-bot-howto.md) for a detailed step-by-step guide. +
+ +
+Slack + +See [docs/slack-bot-howto.md](docs/slack-bot-howto.md) for a detailed step-by-step guide. + +
+ ### 2. Install with Helm (Kiro CLI β€” default) ```bash @@ -46,6 +62,13 @@ helm repo update helm install openab openab/openab \ --set agents.kiro.discord.botToken="$DISCORD_BOT_TOKEN" \ --set-string 'agents.kiro.discord.allowedChannels[0]=YOUR_CHANNEL_ID' + +# Slack +helm install openab openab/openab \ + --set agents.kiro.slack.enabled=true \ + --set agents.kiro.slack.botToken="$SLACK_BOT_TOKEN" \ + --set agents.kiro.slack.appToken="$SLACK_APP_TOKEN" \ + --set-string 'agents.kiro.slack.allowedChannels[0]=C0123456789' ``` ### 3. Authenticate (first time only) @@ -64,6 +87,8 @@ In your Discord channel: The bot creates a thread. After that, just type in the thread β€” no @mention needed. +**Slack:** `@YourBot explain this code` in a channel β€” same thread-based workflow as Discord. + ## Other Agents | Agent | CLI | ACP Adapter | Guide | @@ -96,6 +121,12 @@ bot_token = "${DISCORD_BOT_TOKEN}" # supports env var expansion allowed_channels = ["123456789"] # channel ID allowlist # allowed_users = ["987654321"] # user ID allowlist (empty = all users) +[slack] +bot_token = "${SLACK_BOT_TOKEN}" # Bot User OAuth Token (xoxb-...) +app_token = "${SLACK_APP_TOKEN}" # App-Level Token (xapp-...) for Socket Mode +allowed_channels = ["C0123456789"] # channel ID allowlist (empty = allow all) +# allowed_users = ["U0123456789"] # user ID allowlist (empty = allow all) + [agent] command = "kiro-cli" # CLI command args = ["acp", "--trust-all-tools"] # ACP mode args @@ -184,15 +215,18 @@ kubectl apply -f k8s/deployment.yaml β”œβ”€β”€ config.toml.example # example config with all agent backends β”œβ”€β”€ k8s/ # Kubernetes manifests └── src/ - β”œβ”€β”€ main.rs # entrypoint: tokio + serenity + cleanup + shutdown + β”œβ”€β”€ main.rs # entrypoint: multi-adapter startup, cleanup, shutdown + β”œβ”€β”€ adapter.rs # ChatAdapter trait, AdapterRouter (platform-agnostic) β”œβ”€β”€ config.rs # TOML config + ${ENV_VAR} expansion - β”œβ”€β”€ discord.rs # Discord bot: mention, threads, edit-streaming - β”œβ”€β”€ format.rs # message splitting (2000 char limit) + β”œβ”€β”€ discord.rs # DiscordAdapter: serenity EventHandler + ChatAdapter impl + β”œβ”€β”€ slack.rs # SlackAdapter: Socket Mode + ChatAdapter impl + β”œβ”€β”€ media.rs # shared image resize/compress + STT download + β”œβ”€β”€ format.rs # message splitting, thread name shortening β”œβ”€β”€ reactions.rs # status reaction controller (debounce, stall detection) └── acp/ β”œβ”€β”€ protocol.rs # JSON-RPC types + ACP event classification β”œβ”€β”€ connection.rs # spawn CLI, stdio JSON-RPC communication - └── pool.rs # thread_id β†’ AcpConnection map + └── pool.rs # session key β†’ AcpConnection map ``` ## Inspired By diff --git a/charts/openab/templates/NOTES.txt b/charts/openab/templates/NOTES.txt index 3c69de9..f03ee06 100644 --- a/charts/openab/templates/NOTES.txt +++ b/charts/openab/templates/NOTES.txt @@ -1,15 +1,24 @@ openab {{ .Chart.AppVersion }} has been installed! -⚠️ Discord channel IDs must be set with --set-string (not --set) to avoid float64 precision loss. +⚠️ Channel/user IDs must be set with --set-string (not --set) to avoid float64 precision loss. Agents deployed: {{- range $name, $cfg := .Values.agents }} {{- if ne (include "openab.agentEnabled" $cfg) "false" }} β€’ {{ $name }} ({{ $cfg.command }}) -{{- if not $cfg.discord.botToken }} +{{- if not (or (and ($cfg.discord).enabled ($cfg.discord).botToken) (and ($cfg.slack).enabled ($cfg.slack).botToken)) }} ⚠️ No bot token provided. Create the secret manually: kubectl create secret generic {{ include "openab.agentFullname" (dict "ctx" $ "agent" $name) }} \ - --from-literal=discord-bot-token="YOUR_TOKEN" + --from-literal=discord-bot-token="YOUR_DISCORD_TOKEN" +{{- end }} + +{{- if and ($cfg.discord).enabled ($cfg.discord).botToken }} + Discord: βœ… configured +{{- end }} +{{- if and ($cfg.slack).enabled ($cfg.slack).botToken }} + Slack: βœ… configured (Socket Mode) + Ensure your Slack app has these bot events: app_mention, message.channels, message.groups + Required scopes: app_mentions:read, chat:write, channels:history, groups:history, channels:read, groups:read, reactions:write, files:read {{- end }} {{- if eq $cfg.command "kiro-cli" }} diff --git a/charts/openab/templates/configmap.yaml b/charts/openab/templates/configmap.yaml index cb7dce8..d90c3c5 100644 --- a/charts/openab/templates/configmap.yaml +++ b/charts/openab/templates/configmap.yaml @@ -10,6 +10,7 @@ metadata: {{- include "openab.labels" $d | nindent 4 }} data: config.toml: | + {{- if and ($cfg.discord).enabled ($cfg.discord).botToken }} [discord] bot_token = "${DISCORD_BOT_TOKEN}" {{- range $cfg.discord.allowedChannels }} @@ -41,6 +42,25 @@ data: {{- if $cfg.discord.trustedBotIds }} trusted_bot_ids = {{ $cfg.discord.trustedBotIds | toJson }} {{- end }} + {{- end }} + + {{- if and ($cfg.slack).enabled }} + [slack] + bot_token = "${SLACK_BOT_TOKEN}" + app_token = "${SLACK_APP_TOKEN}" + {{- range ($cfg.slack).allowedChannels }} + {{- if regexMatch "e\\+|E\\+" (toString .) }} + {{- fail (printf "slack.allowedChannels contains a mangled ID: %s β€” use --set-string instead of --set for channel IDs" (toString .)) }} + {{- end }} + {{- end }} + allowed_channels = {{ ($cfg.slack).allowedChannels | default list | toJson }} + {{- range ($cfg.slack).allowedUsers }} + {{- if regexMatch "e\\+|E\\+" (toString .) }} + {{- fail (printf "slack.allowedUsers contains a mangled ID: %s β€” use --set-string instead of --set for user IDs" (toString .)) }} + {{- end }} + {{- end }} + allowed_users = {{ ($cfg.slack).allowedUsers | default list | toJson }} + {{- end }} [agent] command = "{{ $cfg.command }}" diff --git a/charts/openab/templates/deployment.yaml b/charts/openab/templates/deployment.yaml index b237475..2f895b2 100644 --- a/charts/openab/templates/deployment.yaml +++ b/charts/openab/templates/deployment.yaml @@ -38,13 +38,27 @@ spec: {{- toYaml . | nindent 12 }} {{- end }} env: - {{- if $cfg.discord.botToken }} + {{- if and ($cfg.discord).enabled ($cfg.discord).botToken }} - name: DISCORD_BOT_TOKEN valueFrom: secretKeyRef: name: {{ include "openab.agentFullname" $d }} key: discord-bot-token {{- end }} + {{- if and ($cfg.slack).enabled ($cfg.slack).botToken }} + - name: SLACK_BOT_TOKEN + valueFrom: + secretKeyRef: + name: {{ include "openab.agentFullname" $d }} + key: slack-bot-token + {{- end }} + {{- if and ($cfg.slack).enabled ($cfg.slack).appToken }} + - name: SLACK_APP_TOKEN + valueFrom: + secretKeyRef: + name: {{ include "openab.agentFullname" $d }} + key: slack-app-token + {{- end }} {{- if and ($cfg.stt).enabled ($cfg.stt).apiKey }} - name: STT_API_KEY valueFrom: diff --git a/charts/openab/templates/secret.yaml b/charts/openab/templates/secret.yaml index 2cdd27c..d6907fd 100644 --- a/charts/openab/templates/secret.yaml +++ b/charts/openab/templates/secret.yaml @@ -1,6 +1,9 @@ {{- range $name, $cfg := .Values.agents }} {{- if ne (include "openab.agentEnabled" $cfg) "false" }} -{{- if $cfg.discord.botToken }} +{{- $hasDiscord := and ($cfg.discord).enabled ($cfg.discord).botToken }} +{{- $hasSlack := and ($cfg.slack).enabled (or ($cfg.slack).botToken ($cfg.slack).appToken) }} +{{- $hasStt := and ($cfg.stt).enabled ($cfg.stt).apiKey }} +{{- if or $hasDiscord $hasSlack $hasStt }} {{- $d := dict "ctx" $ "agent" $name "cfg" $cfg }} --- apiVersion: v1 @@ -13,8 +16,16 @@ metadata: "helm.sh/resource-policy": keep type: Opaque data: + {{- if $hasDiscord }} discord-bot-token: {{ $cfg.discord.botToken | b64enc | quote }} - {{- if and ($cfg.stt).enabled ($cfg.stt).apiKey }} + {{- end }} + {{- if and ($cfg.slack).enabled ($cfg.slack).botToken }} + slack-bot-token: {{ $cfg.slack.botToken | b64enc | quote }} + {{- end }} + {{- if and ($cfg.slack).enabled ($cfg.slack).appToken }} + slack-app-token: {{ $cfg.slack.appToken | b64enc | quote }} + {{- end }} + {{- if $hasStt }} stt-api-key: {{ $cfg.stt.apiKey | b64enc | quote }} {{- end }} {{- end }} diff --git a/charts/openab/values.yaml b/charts/openab/values.yaml index 8ed2ad9..aa9d414 100644 --- a/charts/openab/values.yaml +++ b/charts/openab/values.yaml @@ -115,6 +115,7 @@ agents: - acp - --trust-all-tools discord: + enabled: true # set to false to disable; enabled with empty botToken is treated as disabled botToken: "" # ⚠️ Use --set-string for channel IDs to avoid float64 precision loss allowedChannels: @@ -126,6 +127,12 @@ agents: allowBotMessages: "off" # trustedBotIds: [] # empty = any bot (mode permitting); set to restrict trustedBotIds: [] + slack: + enabled: false + botToken: "" # Bot User OAuth Token (xoxb-...) + appToken: "" # App-Level Token (xapp-...) for Socket Mode + allowedChannels: [] # empty = allow all channels + allowedUsers: [] # empty = allow all users workingDir: /home/agent env: {} envFrom: [] diff --git a/config.toml.example b/config.toml.example index 48cb44e..927db02 100644 --- a/config.toml.example +++ b/config.toml.example @@ -1,11 +1,19 @@ +# Enable one or more adapters. At least one [discord] or [slack] section is required. + [discord] bot_token = "${DISCORD_BOT_TOKEN}" -allowed_channels = ["1234567890"] +allowed_channels = ["1234567890"] # empty or omitted = allow all channels # allowed_users = [""] # empty or omitted = allow all users # allow_bot_messages = "off" # "off" (default) | "mentions" | "all" # "mentions" is recommended for multi-agent collaboration # trusted_bot_ids = [] # empty = any bot (mode permitting); set to restrict +# [slack] +# bot_token = "${SLACK_BOT_TOKEN}" # Bot User OAuth Token (xoxb-...) +# app_token = "${SLACK_APP_TOKEN}" # App-Level Token (xapp-...) for Socket Mode +# allowed_channels = ["C0123456789"] # empty or omitted = allow all channels +# allowed_users = ["U0123456789"] # empty or omitted = allow all users + [agent] command = "kiro-cli" args = ["acp", "--trust-all-tools"] diff --git a/docs/slack-bot-howto.md b/docs/slack-bot-howto.md new file mode 100644 index 0000000..8fa774e --- /dev/null +++ b/docs/slack-bot-howto.md @@ -0,0 +1,124 @@ +# Slack Bot Setup Guide + +Step-by-step guide to create and configure a Slack bot for openab. + +## 1. Create a Slack App + +1. Go to https://api.slack.com/apps +2. Click **Create New App** β†’ **From scratch** +3. Enter an app name (e.g. "OpenAB") and select your workspace +4. Click **Create App** + +## 2. Enable Socket Mode + +Socket Mode uses a persistent WebSocket connection β€” no public URL or ingress needed. + +1. In the left sidebar, click **Socket Mode** +2. Toggle **Enable Socket Mode** to ON +3. You'll be prompted to generate an **App-Level Token**: + - Token name: `openab-socket` (or any name) + - Scope: `connections:write` + - Click **Generate** +4. Copy the token (`xapp-...`) β€” this is your `SLACK_APP_TOKEN` + +## 3. Subscribe to Events + +1. In the left sidebar, click **Event Subscriptions** +2. Toggle **Enable Events** to ON +3. Under **Subscribe to bot events**, add: + - `app_mention` β€” triggers when someone @mentions the bot + - `message.channels` β€” receives messages in public channels (for thread follow-ups) + - `message.groups` β€” receives messages in private channels (for thread follow-ups) +4. Click **Save Changes** + +## 4. Add Bot Token Scopes + +1. In the left sidebar, click **OAuth & Permissions** +2. Under **Bot Token Scopes**, add: + +| Scope | Purpose | +|-------|---------| +| `app_mentions:read` | Receive @mention events | +| `chat:write` | Send and edit messages | +| `channels:history` | Read public channel messages (for thread context) | +| `groups:history` | Read private channel messages (for thread context) | +| `channels:read` | List public channels | +| `groups:read` | List private channels | +| `reactions:write` | Add/remove emoji reactions | +| `files:read` | Download file attachments (images, audio) | +| `users:read` | Resolve user display names | + +## 5. Install to Workspace + +1. In the left sidebar, click **Install App** +2. Click **Install to Workspace** (or **Reinstall** if you've changed scopes) +3. Authorize the requested permissions +4. Copy the **Bot User OAuth Token** (`xoxb-...`) β€” this is your `SLACK_BOT_TOKEN` + +## 6. Configure openab + +Add the `[slack]` section to your `config.toml`: + +```toml +[slack] +bot_token = "${SLACK_BOT_TOKEN}" +app_token = "${SLACK_APP_TOKEN}" +allowed_channels = [] # empty = allow all channels +# allowed_users = ["U0123456789"] # empty = allow all users +``` + +Set the environment variables: + +```bash +export SLACK_BOT_TOKEN="xoxb-..." +export SLACK_APP_TOKEN="xapp-..." +``` + +## 7. Invite the Bot + +In each Slack channel where you want to use the bot: + +``` +/invite @OpenAB +``` + +## 8. Test + +In a channel where the bot is invited: + +``` +@OpenAB explain this code +``` + +The bot will reply in a thread. After that, just type in the thread β€” no @mention needed for follow-ups. + +## Finding Channel and User IDs + +- **Channel ID**: Right-click the channel name β†’ **View channel details** β†’ ID at the bottom (starts with `C` for public, `G` for private) +- **User ID**: Click a user's profile β†’ **...** menu β†’ **Copy member ID** (starts with `U`) + +## Troubleshooting + +### Bot doesn't respond to @mentions + +1. Verify Socket Mode is enabled in your app settings +2. Check that `app_mention` is subscribed under **bot events** (not user events) +3. Ensure the app is reinstalled after adding new event subscriptions +4. Check the bot is invited to the channel (`/invite @YourSlackAppName`) +5. Run with `RUST_LOG=openab=debug cargo run` to see incoming events + +### Bot doesn't respond to thread follow-ups + +1. Verify `message.channels` (and `message.groups` for private channels) are subscribed under **bot events** +2. Reinstall the app after adding these events + +### "not_authed" or "invalid_auth" errors + +1. Verify your `SLACK_BOT_TOKEN` starts with `xoxb-` +2. Verify your `SLACK_APP_TOKEN` starts with `xapp-` +3. Check the tokens haven't been revoked in your app settings + +### Reactions not showing + +1. Verify `reactions:write` scope is added +2. Reinstall the app after adding the scope diff --git a/src/adapter.rs b/src/adapter.rs new file mode 100644 index 0000000..2c2d096 --- /dev/null +++ b/src/adapter.rs @@ -0,0 +1,456 @@ +use anyhow::Result; +use async_trait::async_trait; +use serde::Serialize; +use std::sync::Arc; +use tokio::sync::watch; +use tracing::error; + +use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool}; +use crate::config::ReactionsConfig; +use crate::error_display::{format_coded_error, format_user_error}; +use crate::format; +use crate::reactions::StatusReactionController; + +// --- Platform-agnostic types --- + +/// Identifies a channel or thread across platforms. +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub struct ChannelRef { + pub platform: String, + pub channel_id: String, + /// Thread within a channel (e.g. Slack thread_ts, Telegram topic_id). + /// For Discord, threads are separate channels so this is None. + pub thread_id: Option, + /// Parent channel if this is a thread-as-channel (Discord). + pub parent_id: Option, +} + +/// Identifies a message across platforms. +#[derive(Clone, Debug)] +pub struct MessageRef { + pub channel: ChannelRef, + pub message_id: String, +} + +/// Sender identity injected into prompts for downstream agent context. +#[derive(Clone, Debug, Serialize)] +pub struct SenderContext { + pub schema: String, + pub sender_id: String, + pub sender_name: String, + pub display_name: String, + pub channel: String, + pub channel_id: String, + pub is_bot: bool, +} + +// --- ChatAdapter trait --- + +#[async_trait] +pub trait ChatAdapter: Send + Sync + 'static { + /// Platform name for logging and session key namespacing. + fn platform(&self) -> &'static str; + + /// Maximum message length for this platform (e.g. 2000 for Discord, 4000 for Slack). + fn message_limit(&self) -> usize; + + /// Send a new message, returns a reference to the sent message. + async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result; + + /// Edit an existing message in-place. + async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()>; + + /// Create a thread from a trigger message, returns the thread channel ref. + async fn create_thread( + &self, + channel: &ChannelRef, + trigger_msg: &MessageRef, + title: &str, + ) -> Result; + + /// Add a reaction/emoji to a message. + async fn add_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()>; + + /// Remove a reaction/emoji from a message. + async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()>; +} + +// --- AdapterRouter --- + +/// Shared logic for routing messages to ACP agents, managing sessions, +/// streaming edits, and controlling reactions. Platform-independent. +pub struct AdapterRouter { + pool: Arc, + reactions_config: ReactionsConfig, +} + +impl AdapterRouter { + pub fn new(pool: Arc, reactions_config: ReactionsConfig) -> Self { + Self { + pool, + reactions_config, + } + } + + /// Handle an incoming user message. The adapter is responsible for + /// filtering, resolving the thread, and building the SenderContext. + /// This method handles sender context injection, session management, and streaming. + pub async fn handle_message( + &self, + adapter: &Arc, + thread_channel: &ChannelRef, + sender: &SenderContext, + prompt: &str, + extra_blocks: Vec, + trigger_msg: &MessageRef, + ) -> Result<()> { + tracing::debug!(platform = adapter.platform(), "processing message"); + + // Build content blocks: sender context + prompt text, then extra (images, transcripts) + let sender_json = serde_json::to_string(sender).unwrap(); + let prompt_with_sender = format!( + "\n{}\n\n\n{}", + sender_json, prompt + ); + + let mut content_blocks = Vec::with_capacity(1 + extra_blocks.len()); + // Prepend any transcript blocks (they go before the text block) + for block in &extra_blocks { + if matches!(block, ContentBlock::Text { .. }) { + content_blocks.push(block.clone()); + } + } + content_blocks.push(ContentBlock::Text { + text: prompt_with_sender, + }); + // Append non-text blocks (images) + for block in extra_blocks { + if !matches!(block, ContentBlock::Text { .. }) { + content_blocks.push(block); + } + } + + let thinking_msg = adapter.send_message(thread_channel, "...").await?; + + let thread_key = format!( + "{}:{}", + adapter.platform(), + thread_channel + .thread_id + .as_deref() + .unwrap_or(&thread_channel.channel_id) + ); + + if let Err(e) = self.pool.get_or_create(&thread_key).await { + let msg = format_user_error(&e.to_string()); + let _ = adapter + .edit_message(&thinking_msg, &format!("⚠️ {msg}")) + .await; + error!("pool error: {e}"); + return Err(e); + } + + let reactions = Arc::new(StatusReactionController::new( + self.reactions_config.enabled, + adapter.clone(), + trigger_msg.clone(), + self.reactions_config.emojis.clone(), + self.reactions_config.timing.clone(), + )); + reactions.set_queued().await; + + let result = self + .stream_prompt( + adapter, + &thread_key, + content_blocks, + thread_channel, + &thinking_msg, + reactions.clone(), + ) + .await; + + match &result { + Ok(()) => reactions.set_done().await, + Err(_) => reactions.set_error().await, + } + + let hold_ms = if result.is_ok() { + self.reactions_config.timing.done_hold_ms + } else { + self.reactions_config.timing.error_hold_ms + }; + if self.reactions_config.remove_after_reply { + let reactions = reactions; + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(hold_ms)).await; + reactions.clear().await; + }); + } + + if let Err(ref e) = result { + let _ = adapter + .edit_message(&thinking_msg, &format!("⚠️ {e}")) + .await; + } + + result + } + + async fn stream_prompt( + &self, + adapter: &Arc, + thread_key: &str, + content_blocks: Vec, + thread_channel: &ChannelRef, + thinking_msg: &MessageRef, + reactions: Arc, + ) -> Result<()> { + let adapter = adapter.clone(); + let thread_channel = thread_channel.clone(); + let msg_ref = thinking_msg.clone(); + let message_limit = adapter.message_limit(); + + self.pool + .with_connection(thread_key, |conn| { + let content_blocks = content_blocks.clone(); + Box::pin(async move { + let reset = conn.session_reset; + conn.session_reset = false; + + let (mut rx, _) = conn.session_prompt(content_blocks).await?; + reactions.set_thinking().await; + + let initial = if reset { + "⚠️ _Session expired, starting fresh..._\n\n...".to_string() + } else { + "...".to_string() + }; + let (buf_tx, buf_rx) = watch::channel(initial); + + let mut text_buf = String::new(); + let mut tool_lines: Vec = Vec::new(); + + if reset { + text_buf.push_str("⚠️ _Session expired, starting fresh..._\n\n"); + } + + // Spawn edit-streaming task β€” only edits the single message, never sends new ones. + // Long content is truncated during streaming; final multi-message split happens after. + let streaming_limit = message_limit.saturating_sub(100); + let edit_handle = { + let adapter = adapter.clone(); + let msg_ref = msg_ref.clone(); + let mut buf_rx = buf_rx.clone(); + tokio::spawn(async move { + let mut last_content = String::new(); + loop { + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + if buf_rx.has_changed().unwrap_or(false) { + let content = buf_rx.borrow_and_update().clone(); + if content != last_content { + let display = if content.chars().count() > streaming_limit { + // Tail-priority: keep the last N chars so user + // sees the most recent agent output + let total = content.chars().count(); + let skip = total - streaming_limit; + let truncated: String = content.chars().skip(skip).collect(); + format!("…(truncated)\n{truncated}") + } else { + content.clone() + }; + let _ = adapter.edit_message(&msg_ref, &display).await; + last_content = content; + } + } + if buf_rx.has_changed().is_err() { + break; + } + } + }) + }; + + // Process ACP notifications + let mut got_first_text = false; + let mut response_error: Option = None; + while let Some(notification) = rx.recv().await { + if notification.id.is_some() { + if let Some(ref err) = notification.error { + response_error = Some(format_coded_error(err.code, &err.message)); + } + break; + } + + if let Some(event) = classify_notification(¬ification) { + match event { + AcpEvent::Text(t) => { + if !got_first_text { + got_first_text = true; + } + text_buf.push_str(&t); + let _ = + buf_tx.send(compose_display(&tool_lines, &text_buf, true)); + } + AcpEvent::Thinking => { + reactions.set_thinking().await; + } + AcpEvent::ToolStart { id, title } if !title.is_empty() => { + reactions.set_tool(&title).await; + let title = sanitize_title(&title); + if let Some(slot) = tool_lines.iter_mut().find(|e| e.id == id) { + slot.title = title; + slot.state = ToolState::Running; + } else { + tool_lines.push(ToolEntry { + id, + title, + state: ToolState::Running, + }); + } + let _ = + buf_tx.send(compose_display(&tool_lines, &text_buf, true)); + } + AcpEvent::ToolDone { id, title, status } => { + reactions.set_thinking().await; + let new_state = if status == "completed" { + ToolState::Completed + } else { + ToolState::Failed + }; + if let Some(slot) = tool_lines.iter_mut().find(|e| e.id == id) { + if !title.is_empty() { + slot.title = sanitize_title(&title); + } + slot.state = new_state; + } else if !title.is_empty() { + tool_lines.push(ToolEntry { + id, + title: sanitize_title(&title), + state: new_state, + }); + } + let _ = + buf_tx.send(compose_display(&tool_lines, &text_buf, true)); + } + _ => {} + } + } + } + + conn.prompt_done().await; + drop(buf_tx); + let _ = edit_handle.await; + + // Final edit with complete content + let final_content = compose_display(&tool_lines, &text_buf, false); + let final_content = if final_content.is_empty() { + if let Some(err) = response_error { + format!("⚠️ {err}") + } else { + "_(no response)_".to_string() + } + } else if let Some(err) = response_error { + format!("⚠️ {err}\n\n{final_content}") + } else { + final_content + }; + + let chunks = format::split_message(&final_content, message_limit); + let mut current_msg = msg_ref; + for (i, chunk) in chunks.iter().enumerate() { + if i == 0 { + let _ = adapter.edit_message(¤t_msg, chunk).await; + } else if let Ok(new_msg) = + adapter.send_message(&thread_channel, chunk).await + { + current_msg = new_msg; + } + } + + Ok(()) + }) + }) + .await + } +} + +/// Flatten a tool-call title into a single line safe for inline-code spans. +fn sanitize_title(title: &str) -> String { + title.replace('\r', "").replace('\n', " ; ").replace('`', "'") +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ToolState { + Running, + Completed, + Failed, +} + +#[derive(Debug, Clone)] +struct ToolEntry { + id: String, + title: String, + state: ToolState, +} + +impl ToolEntry { + fn render(&self) -> String { + let icon = match self.state { + ToolState::Running => "πŸ”§", + ToolState::Completed => "βœ…", + ToolState::Failed => "❌", + }; + let suffix = if self.state == ToolState::Running { "..." } else { "" }; + format!("{icon} `{}`{}", self.title, suffix) + } +} + +/// Maximum number of finished tool entries to show individually +/// during streaming before collapsing into a summary line. +const TOOL_COLLAPSE_THRESHOLD: usize = 3; + +fn compose_display(tool_lines: &[ToolEntry], text: &str, streaming: bool) -> String { + let mut out = String::new(); + if !tool_lines.is_empty() { + if streaming { + let done = tool_lines.iter().filter(|e| e.state == ToolState::Completed).count(); + let failed = tool_lines.iter().filter(|e| e.state == ToolState::Failed).count(); + let running: Vec<_> = tool_lines.iter().filter(|e| e.state == ToolState::Running).collect(); + let finished = done + failed; + + if finished <= TOOL_COLLAPSE_THRESHOLD { + for entry in tool_lines.iter().filter(|e| e.state != ToolState::Running) { + out.push_str(&entry.render()); + out.push('\n'); + } + } else { + let mut parts = Vec::new(); + if done > 0 { parts.push(format!("βœ… {done}")); } + if failed > 0 { parts.push(format!("❌ {failed}")); } + out.push_str(&format!("{} tool(s) completed\n", parts.join(" Β· "))); + } + + if running.len() <= TOOL_COLLAPSE_THRESHOLD { + for entry in &running { + out.push_str(&entry.render()); + out.push('\n'); + } + } else { + let hidden = running.len() - TOOL_COLLAPSE_THRESHOLD; + out.push_str(&format!("πŸ”§ {hidden} more running\n")); + for entry in running.iter().skip(hidden) { + out.push_str(&entry.render()); + out.push('\n'); + } + } + } else { + for entry in tool_lines { + out.push_str(&entry.render()); + out.push('\n'); + } + } + if !out.is_empty() { out.push('\n'); } + } + out.push_str(text.trim_end()); + out +} diff --git a/src/config.rs b/src/config.rs index 658e00b..46efb1c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -33,7 +33,8 @@ impl<'de> Deserialize<'de> for AllowBots { #[derive(Debug, Deserialize)] pub struct Config { - pub discord: DiscordConfig, + pub discord: Option, + pub slack: Option, pub agent: AgentConfig, #[serde(default)] pub pool: PoolConfig, @@ -87,6 +88,16 @@ pub struct DiscordConfig { pub trusted_bot_ids: Vec, } +#[derive(Debug, Deserialize)] +pub struct SlackConfig { + pub bot_token: String, + pub app_token: String, + #[serde(default)] + pub allowed_channels: Vec, + #[serde(default)] + pub allowed_users: Vec, +} + #[derive(Debug, Deserialize)] pub struct AgentConfig { pub command: String, diff --git a/src/discord.rs b/src/discord.rs index 278d3bf..ea02b7c 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -1,53 +1,132 @@ -use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool}; -use crate::config::{AllowBots, ReactionsConfig, SttConfig}; -use crate::error_display::{format_coded_error, format_user_error}; +use crate::acp::ContentBlock; +use crate::adapter::{AdapterRouter, ChatAdapter, ChannelRef, MessageRef, SenderContext}; +use crate::config::{AllowBots, SttConfig}; use crate::format; -use crate::reactions::StatusReactionController; -use base64::engine::general_purpose::STANDARD as BASE64; -use base64::Engine; -use image::ImageReader; -use std::io::Cursor; +use crate::media; +use async_trait::async_trait; use std::sync::LazyLock; -use serenity::async_trait; -use serenity::model::channel::{Message, ReactionType}; +use serenity::builder::{CreateThread, EditMessage}; +use serenity::http::Http; +use serenity::model::channel::{AutoArchiveDuration, Message, ReactionType}; use serenity::model::gateway::Ready; use serenity::model::id::{ChannelId, MessageId, UserId}; use serenity::model::user::User; use serenity::prelude::*; use std::collections::HashSet; -use std::sync::Arc; -use tokio::sync::watch; -use tracing::{debug, error, info, warn}; +use std::sync::{Arc, OnceLock}; +use tracing::{debug, error, info}; -/// Hard cap on consecutive bot messages (from any other bot) in a -/// channel or thread. When this many recent messages are all from -/// bots other than ourselves, we stop responding to prevent runaway -/// loops between multiple bots in "all" mode. -/// -/// Note: must be ≀ 255 because Serenity's `GetMessages::limit()` takes `u8`. -/// Inspired by OpenClaw's `session.agentToAgent.maxPingPongTurns`. +/// Hard cap on consecutive bot messages in a channel or thread. +/// Prevents runaway loops between multiple bots in "all" mode. const MAX_CONSECUTIVE_BOT_TURNS: u8 = 10; -/// Reusable HTTP client for downloading Discord attachments. -/// Built once with a 30s timeout and rustls TLS (no native-tls deps). -static HTTP_CLIENT: LazyLock = LazyLock::new(|| { - reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(30)) - .build() - .expect("static HTTP client must build") -}); +// --- DiscordAdapter: implements ChatAdapter for Discord via serenity --- + +pub struct DiscordAdapter { + http: Arc, +} + +impl DiscordAdapter { + pub fn new(http: Arc) -> Self { + Self { http } + } +} + +#[async_trait] +impl ChatAdapter for DiscordAdapter { + fn platform(&self) -> &'static str { + "discord" + } + + fn message_limit(&self) -> usize { + 2000 + } + + async fn send_message(&self, channel: &ChannelRef, content: &str) -> anyhow::Result { + let ch_id: u64 = channel.channel_id.parse()?; + let msg = ChannelId::new(ch_id).say(&self.http, content).await?; + Ok(MessageRef { + channel: channel.clone(), + message_id: msg.id.to_string(), + }) + } + + async fn edit_message(&self, msg: &MessageRef, content: &str) -> anyhow::Result<()> { + let ch_id: u64 = msg.channel.channel_id.parse()?; + let msg_id: u64 = msg.message_id.parse()?; + ChannelId::new(ch_id) + .edit_message( + &self.http, + MessageId::new(msg_id), + EditMessage::new().content(content), + ) + .await?; + Ok(()) + } + + async fn create_thread( + &self, + channel: &ChannelRef, + trigger_msg: &MessageRef, + title: &str, + ) -> anyhow::Result { + let ch_id: u64 = channel.channel_id.parse()?; + let msg_id: u64 = trigger_msg.message_id.parse()?; + let thread = ChannelId::new(ch_id) + .create_thread_from_message( + &self.http, + MessageId::new(msg_id), + CreateThread::new(title).auto_archive_duration(AutoArchiveDuration::OneDay), + ) + .await?; + Ok(ChannelRef { + platform: "discord".into(), + channel_id: thread.id.to_string(), + thread_id: None, + parent_id: Some(channel.channel_id.clone()), + }) + } + + async fn add_reaction(&self, msg: &MessageRef, emoji: &str) -> anyhow::Result<()> { + let ch_id: u64 = msg.channel.channel_id.parse()?; + let msg_id: u64 = msg.message_id.parse()?; + self.http + .create_reaction( + ChannelId::new(ch_id), + MessageId::new(msg_id), + &ReactionType::Unicode(emoji.to_string()), + ) + .await?; + Ok(()) + } + + async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> anyhow::Result<()> { + let ch_id: u64 = msg.channel.channel_id.parse()?; + let msg_id: u64 = msg.message_id.parse()?; + self.http + .delete_reaction_me( + ChannelId::new(ch_id), + MessageId::new(msg_id), + &ReactionType::Unicode(emoji.to_string()), + ) + .await?; + Ok(()) + } +} + +// --- Handler: serenity EventHandler that delegates to AdapterRouter --- pub struct Handler { - pub pool: Arc, + pub router: Arc, pub allowed_channels: HashSet, pub allowed_users: HashSet, - pub reactions_config: ReactionsConfig, pub stt_config: SttConfig, + pub adapter: OnceLock>, pub allow_bot_messages: AllowBots, pub trusted_bot_ids: HashSet, } -#[async_trait] +#[serenity::async_trait] impl EventHandler for Handler { async fn message(&self, ctx: Context, msg: Message) { let bot_id = ctx.cache.current_user().id; @@ -57,34 +136,27 @@ impl EventHandler for Handler { return; } + let adapter = self.adapter.get_or_init(|| { + Arc::new(DiscordAdapter::new(ctx.http.clone())) + }).clone(); + let channel_id = msg.channel_id.get(); let in_allowed_channel = self.allowed_channels.is_empty() || self.allowed_channels.contains(&channel_id); let is_mentioned = msg.mentions_user_id(bot_id) || msg.content.contains(&format!("<@{}>", bot_id)) - || msg.mention_roles.iter().any(|r| msg.content.contains(&format!("<@&{}>", r))); + || msg + .mention_roles + .iter() + .any(|r| msg.content.contains(&format!("<@&{}>", r))); - // Bot message gating β€” runs after self-ignore but before channel/user - // allowlist checks. This ordering is intentional: channel checks below - // apply uniformly to both human and bot messages, so a bot mention in - // a non-allowed channel is still rejected by the channel check. + // Bot message gating (from upstream #321) if msg.author.bot { match self.allow_bot_messages { AllowBots::Off => return, AllowBots::Mentions => if !is_mentioned { return; }, AllowBots::All => { - // Safety net: count consecutive messages from any bot - // (excluding ourselves) in recent history. If all recent - // messages are from other bots, we've likely entered a - // loop. This counts *all* other-bot messages, not just - // one specific bot β€” so 3 bots taking turns still hits - // the cap (which is intentionally conservative). - // - // Try cache first to avoid an API call on every bot - // message. Fall back to API on cache miss. If both fail, - // reject the message (fail-closed) to avoid unbounded - // loops during Discord API outages. let cap = MAX_CONSECUTIVE_BOT_TURNS as usize; let history = ctx.cache.channel_messages(msg.channel_id) .map(|msgs| { @@ -92,7 +164,7 @@ impl EventHandler for Handler { .filter(|(mid, _)| **mid < msg.id) .map(|(_, m)| m.clone()) .collect(); - recent.sort_unstable_by(|a, b| b.id.cmp(&a.id)); // newest first + recent.sort_unstable_by(|a, b| b.id.cmp(&a.id)); recent.truncate(cap); recent }) @@ -123,7 +195,6 @@ impl EventHandler for Handler { }, } - // If trusted_bot_ids is set, only allow bots on the list if !self.trusted_bot_ids.is_empty() && !self.trusted_bot_ids.contains(&msg.author.id.get()) { tracing::debug!(bot_id = %msg.author.id, "bot not in trusted_bot_ids, ignoring"); return; @@ -161,9 +232,8 @@ impl EventHandler for Handler { if !self.allowed_users.is_empty() && !self.allowed_users.contains(&msg.author.id.get()) { tracing::info!(user_id = %msg.author.id, "denied user, ignoring"); - if let Err(e) = msg.react(&ctx.http, ReactionType::Unicode("🚫".into())).await { - tracing::warn!(error = %e, "failed to react with 🚫"); - } + let msg_ref = discord_msg_ref(&msg); + let _ = adapter.add_reaction(&msg_ref, "🚫").await; return; } @@ -173,86 +243,80 @@ impl EventHandler for Handler { msg.content.trim().to_string() }; - // No text and no image attachments β†’ skip to avoid wasting session slots + // No text and no attachments β†’ skip if prompt.is_empty() && msg.attachments.is_empty() { return; } - // Build content blocks: text + image attachments - let mut content_blocks = vec![]; - - // Inject structured sender context so the downstream CLI can identify who sent the message - let display_name = msg.member.as_ref() + let display_name = msg + .member + .as_ref() .and_then(|m| m.nick.as_ref()) .unwrap_or(&msg.author.name); - let sender_ctx = serde_json::json!({ - "schema": "openab.sender.v1", - "sender_id": msg.author.id.to_string(), - "sender_name": msg.author.name, - "display_name": display_name, - "channel": "discord", - "channel_id": msg.channel_id.to_string(), - "is_bot": msg.author.bot, - }); - let prompt_with_sender = format!( - "\n{}\n\n\n{}", - serde_json::to_string(&sender_ctx).unwrap(), - prompt - ); - - // Add text block (always, even if empty, we still send for sender context) - content_blocks.push(ContentBlock::Text { - text: prompt_with_sender.clone(), - }); + let sender = SenderContext { + schema: "openab.sender.v1".into(), + sender_id: msg.author.id.to_string(), + sender_name: msg.author.name.clone(), + display_name: display_name.to_string(), + channel: "discord".into(), + channel_id: msg.channel_id.to_string(), + is_bot: msg.author.bot, + }; - // Process attachments: route by content type (audio β†’ STT, image β†’ encode) - let mut audio_skipped = false; - if !msg.attachments.is_empty() { - for attachment in &msg.attachments { - if is_audio_attachment(attachment) { - if self.stt_config.enabled { - if let Some(transcript) = download_and_transcribe(attachment, &self.stt_config).await { - debug!(filename = %attachment.filename, chars = transcript.len(), "voice transcript injected"); - content_blocks.insert(0, ContentBlock::Text { - text: format!("[Voice message transcript]: {transcript}"), - }); - } - } else { - warn!(filename = %attachment.filename, "skipping audio attachment (STT disabled)"); - audio_skipped = true; + // Build extra content blocks from attachments (images, audio) + let mut extra_blocks = Vec::new(); + for attachment in &msg.attachments { + let mime = attachment.content_type.as_deref().unwrap_or(""); + if media::is_audio_mime(mime) { + if self.stt_config.enabled { + let mime_clean = mime.split(';').next().unwrap_or(mime).trim(); + if let Some(transcript) = media::download_and_transcribe( + &attachment.url, + &attachment.filename, + mime_clean, + u64::from(attachment.size), + &self.stt_config, + None, + ).await { + debug!(filename = %attachment.filename, chars = transcript.len(), "voice transcript injected"); + extra_blocks.insert(0, ContentBlock::Text { + text: format!("[Voice message transcript]: {transcript}"), + }); } - } else if let Some(content_block) = download_and_encode_image(attachment).await { - debug!(url = %attachment.url, filename = %attachment.filename, "adding image attachment"); - content_blocks.push(content_block); + } else { + tracing::warn!(filename = %attachment.filename, "skipping audio attachment (STT disabled)"); + let msg_ref = discord_msg_ref(&msg); + let _ = adapter.add_reaction(&msg_ref, "🎀").await; } - } - } - - // If audio was skipped, react with 🎀 so the user knows their voice message was noticed - if audio_skipped { - let _ = msg.react(&ctx.http, ReactionType::Unicode("🎀".into())).await; - // Voice-only message: no text and only the sender_context block β†’ early return - if prompt.is_empty() && content_blocks.len() == 1 { - return; + } else if let Some(block) = media::download_and_encode_image( + &attachment.url, + attachment.content_type.as_deref(), + &attachment.filename, + u64::from(attachment.size), + None, + ).await { + debug!(url = %attachment.url, filename = %attachment.filename, "adding image attachment"); + extra_blocks.push(block); } } tracing::debug!( - text_len = prompt_with_sender.len(), + num_extra_blocks = extra_blocks.len(), num_attachments = msg.attachments.len(), in_thread, "processing" ); - // Note: image-only messages (no text) are intentionally allowed since - // prompt_with_sender always includes the non-empty sender_context XML. - // The guard above (prompt.is_empty() && no attachments) handles stickers/embeds. - - let thread_id = if in_thread { - msg.channel_id.get() + let thread_channel = if in_thread { + ChannelRef { + platform: "discord".into(), + channel_id: msg.channel_id.get().to_string(), + thread_id: None, + parent_id: None, + } } else { - match get_or_create_thread(&ctx, &msg, &prompt).await { - Ok(id) => id, + match get_or_create_thread(&ctx, &adapter, &msg, &prompt).await { + Ok(ch) => ch, Err(e) => { error!("failed to create thread: {e}"); return; @@ -260,68 +324,14 @@ impl EventHandler for Handler { } }; - let thread_channel = ChannelId::new(thread_id); - - let thinking_msg = match thread_channel.say(&ctx.http, "...").await { - Ok(m) => m, - Err(e) => { - error!("failed to post: {e}"); - return; - } - }; - - let thread_key = thread_id.to_string(); - if let Err(e) = self.pool.get_or_create(&thread_key).await { - let msg = format_user_error(&e.to_string()); - let _ = edit(&ctx, thread_channel, thinking_msg.id, &format!("⚠️ {}", msg)).await; - error!("pool error: {e}"); - return; - } - - // Create reaction controller on the user's original message - let reactions = Arc::new(StatusReactionController::new( - self.reactions_config.enabled, - ctx.http.clone(), - msg.channel_id, - msg.id, - self.reactions_config.emojis.clone(), - self.reactions_config.timing.clone(), - )); - reactions.set_queued().await; - - // Stream prompt with live edits (pass content blocks instead of just text) - let result = stream_prompt( - &self.pool, - &thread_key, - content_blocks, - &ctx, - thread_channel, - thinking_msg.id, - reactions.clone(), - ) - .await; - - match &result { - Ok(()) => reactions.set_done().await, - Err(_) => reactions.set_error().await, - } - - // Hold emoji briefly then clear - let hold_ms = if result.is_ok() { - self.reactions_config.timing.done_hold_ms - } else { - self.reactions_config.timing.error_hold_ms - }; - if self.reactions_config.remove_after_reply { - let reactions = reactions; - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(hold_ms)).await; - reactions.clear().await; - }); - } + let trigger_msg = discord_msg_ref(&msg); - if let Err(e) = result { - let _ = edit(&ctx, thread_channel, thinking_msg.id, &format!("⚠️ {e}")).await; + if let Err(e) = self + .router + .handle_message(&adapter, &thread_channel, &sender, &prompt, extra_blocks, &trigger_msg) + .await + { + error!("handle_message error: {e}"); } } @@ -330,465 +340,47 @@ impl EventHandler for Handler { } } -/// Check if an attachment is an audio file (voice messages are typically audio/ogg). -fn is_audio_attachment(attachment: &serenity::model::channel::Attachment) -> bool { - let mime = attachment.content_type.as_deref().unwrap_or(""); - mime.starts_with("audio/") -} - -/// Download an audio attachment and transcribe it via the configured STT provider. -async fn download_and_transcribe( - attachment: &serenity::model::channel::Attachment, - stt_config: &SttConfig, -) -> Option { - const MAX_SIZE: u64 = 25 * 1024 * 1024; // 25 MB (Whisper API limit) - - if u64::from(attachment.size) > MAX_SIZE { - error!(filename = %attachment.filename, size = attachment.size, "audio exceeds 25MB limit"); - return None; - } - - let resp = HTTP_CLIENT.get(&attachment.url).send().await.ok()?; - if !resp.status().is_success() { - error!(url = %attachment.url, status = %resp.status(), "audio download failed"); - return None; +// --- Discord-specific helpers --- + +fn discord_msg_ref(msg: &Message) -> MessageRef { + MessageRef { + channel: ChannelRef { + platform: "discord".into(), + channel_id: msg.channel_id.get().to_string(), + thread_id: None, + parent_id: None, + }, + message_id: msg.id.to_string(), } - let bytes = resp.bytes().await.ok()?.to_vec(); - - let mime_type = attachment.content_type.as_deref().unwrap_or("audio/ogg"); - let mime_type = mime_type.split(';').next().unwrap_or(mime_type).trim(); - - crate::stt::transcribe(&HTTP_CLIENT, stt_config, bytes, attachment.filename.clone(), mime_type).await } -/// Maximum dimension (width or height) for resized images. -/// Matches OpenClaw's DEFAULT_IMAGE_MAX_DIMENSION_PX. -const IMAGE_MAX_DIMENSION_PX: u32 = 1200; - -/// JPEG quality for compressed output (OpenClaw uses progressive 85β†’35; -/// we start at 75 which is a good balance of quality vs size). -const IMAGE_JPEG_QUALITY: u8 = 75; - -/// Download a Discord image attachment, resize/compress it, then base64-encode -/// as an ACP image content block. -/// -/// Large images are resized so the longest side is at most 1200px and -/// re-encoded as JPEG at quality 75. This keeps the base64 payload well -/// under typical JSON-RPC transport limits (~200-400KB after encoding). -async fn download_and_encode_image(attachment: &serenity::model::channel::Attachment) -> Option { - const MAX_SIZE: u64 = 10 * 1024 * 1024; // 10 MB - - let url = &attachment.url; - if url.is_empty() { - return None; - } - - // Determine media type β€” prefer content-type header, fallback to extension - let media_type = attachment - .content_type - .as_deref() - .or_else(|| { - attachment - .filename - .rsplit('.') - .next() - .and_then(|ext| match ext.to_lowercase().as_str() { - "png" => Some("image/png"), - "jpg" | "jpeg" => Some("image/jpeg"), - "gif" => Some("image/gif"), - "webp" => Some("image/webp"), - _ => None, - }) - }); - - let Some(mime) = media_type else { - debug!(filename = %attachment.filename, "skipping non-image attachment"); - return None; - }; - let mime = mime.split(';').next().unwrap_or(mime).trim(); - if !mime.starts_with("image/") { - debug!(filename = %attachment.filename, mime = %mime, "skipping non-image attachment"); - return None; - } - - if u64::from(attachment.size) > MAX_SIZE { - error!(filename = %attachment.filename, size = attachment.size, "image exceeds 10MB limit"); - return None; - } - - let response = match HTTP_CLIENT.get(url).send().await { - Ok(resp) => resp, - Err(e) => { error!(url = %url, error = %e, "download failed"); return None; } - }; - if !response.status().is_success() { - error!(url = %url, status = %response.status(), "HTTP error downloading image"); - return None; - } - let bytes = match response.bytes().await { - Ok(b) => b, - Err(e) => { error!(url = %url, error = %e, "read failed"); return None; } - }; - - // Defense-in-depth: verify actual download size - if bytes.len() as u64 > MAX_SIZE { - error!(filename = %attachment.filename, size = bytes.len(), "downloaded image exceeds limit"); - return None; - } - - // Resize and compress - let (output_bytes, output_mime) = match resize_and_compress(&bytes) { - Ok(result) => result, - Err(e) => { - // Fallback: use original bytes but reject if too large for transport - if bytes.len() > 1024 * 1024 { - error!(filename = %attachment.filename, error = %e, size = bytes.len(), "resize failed and original too large, skipping"); - return None; - } - debug!(filename = %attachment.filename, error = %e, "resize failed, using original"); - (bytes.to_vec(), mime.to_string()) +async fn get_or_create_thread( + ctx: &Context, + adapter: &Arc, + msg: &Message, + prompt: &str, +) -> anyhow::Result { + let channel = msg.channel_id.to_channel(&ctx.http).await?; + if let serenity::model::channel::Channel::Guild(ref gc) = channel { + if gc.thread_metadata.is_some() { + return Ok(ChannelRef { + platform: "discord".into(), + channel_id: msg.channel_id.get().to_string(), + thread_id: None, + parent_id: None, + }); } - }; - - debug!( - filename = %attachment.filename, - original_size = bytes.len(), - compressed_size = output_bytes.len(), - "image processed" - ); - - let encoded = BASE64.encode(&output_bytes); - Some(ContentBlock::Image { - media_type: output_mime, - data: encoded, - }) -} - -/// Resize image so longest side ≀ IMAGE_MAX_DIMENSION_PX, then encode as JPEG. -/// Returns (compressed_bytes, mime_type). GIFs are passed through unchanged -/// to preserve animation. -fn resize_and_compress(raw: &[u8]) -> Result<(Vec, String), image::ImageError> { - let reader = ImageReader::new(Cursor::new(raw)) - .with_guessed_format()?; - - let format = reader.format(); - - // Pass through GIFs unchanged to preserve animation - if format == Some(image::ImageFormat::Gif) { - return Ok((raw.to_vec(), "image/gif".to_string())); } - let img = reader.decode()?; - let (w, h) = (img.width(), img.height()); - - // Resize preserving aspect ratio: scale so longest side = 1200px - let img = if w > IMAGE_MAX_DIMENSION_PX || h > IMAGE_MAX_DIMENSION_PX { - let max_side = std::cmp::max(w, h); - let ratio = f64::from(IMAGE_MAX_DIMENSION_PX) / f64::from(max_side); - let new_w = (f64::from(w) * ratio) as u32; - let new_h = (f64::from(h) * ratio) as u32; - img.resize(new_w, new_h, image::imageops::FilterType::Lanczos3) - } else { - img + let thread_name = format::shorten_thread_name(prompt); + let parent = ChannelRef { + platform: "discord".into(), + channel_id: msg.channel_id.get().to_string(), + thread_id: None, + parent_id: None, }; - - // Encode as JPEG - let mut buf = Cursor::new(Vec::new()); - let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, IMAGE_JPEG_QUALITY); - img.write_with_encoder(encoder)?; - - Ok((buf.into_inner(), "image/jpeg".to_string())) -} - -async fn edit(ctx: &Context, ch: ChannelId, msg_id: MessageId, content: &str) -> serenity::Result { - ch.edit_message(&ctx.http, msg_id, serenity::builder::EditMessage::new().content(content)).await -} - -async fn stream_prompt( - pool: &SessionPool, - thread_key: &str, - content_blocks: Vec, - ctx: &Context, - channel: ChannelId, - msg_id: MessageId, - reactions: Arc, -) -> anyhow::Result<()> { - let reactions = reactions.clone(); - - pool.with_connection(thread_key, |conn| { - let content_blocks = content_blocks.clone(); - let ctx = ctx.clone(); - let reactions = reactions.clone(); - Box::pin(async move { - let reset = conn.session_reset; - conn.session_reset = false; - - let (mut rx, _): (_, _) = conn.session_prompt(content_blocks).await?; - reactions.set_thinking().await; - - let initial = if reset { - "⚠️ _Session expired, starting fresh..._\n\n...".to_string() - } else { - "...".to_string() - }; - let (buf_tx, buf_rx) = watch::channel(initial); - - let mut text_buf = String::new(); - // Tool calls indexed by toolCallId. Vec preserves first-seen - // order. We store id + title + state separately so a ToolDone - // event that arrives without a refreshed title (claude-agent-acp's - // update events don't always re-send the title field) can still - // reuse the title we already learned from a prior - // tool_call_update β€” only the icon flips πŸ”§ β†’ βœ… / ❌. Rendering - // happens on the fly in compose_display(). - let mut tool_lines: Vec = Vec::new(); - let current_msg_id = msg_id; - - if reset { - text_buf.push_str("⚠️ _Session expired, starting fresh..._\n\n"); - } - - // Spawn edit-streaming task β€” only edits the single message, never sends new ones. - // Long content is truncated during streaming; final multi-message split happens after. - let edit_handle = { - let ctx = ctx.clone(); - let mut buf_rx = buf_rx.clone(); - tokio::spawn(async move { - let mut last_content = String::new(); - loop { - tokio::time::sleep(std::time::Duration::from_millis(1500)).await; - if buf_rx.has_changed().unwrap_or(false) { - let content = buf_rx.borrow_and_update().clone(); - if content != last_content { - let display = if content.chars().count() > 1900 { - // Tail-priority: keep the last 1900 chars so the - // user always sees the most recent agent output - // (e.g. a confirmation prompt) instead of old tool lines. - let total = content.chars().count(); - let skip = total - 1900; - let truncated: String = content.chars().skip(skip).collect(); - format!("…(truncated)\n{truncated}") - } else { - content.clone() - }; - let _ = edit(&ctx, channel, msg_id, &display).await; - last_content = content; - } - } - if buf_rx.has_changed().is_err() { - break; - } - } - }) - }; - - // Process ACP notifications - let mut got_first_text = false; - let mut response_error: Option = None; - while let Some(notification) = rx.recv().await { - if notification.id.is_some() { - // Capture error from ACP response to display in Discord - if let Some(ref err) = notification.error { - response_error = Some(format_coded_error(err.code, &err.message)); - } - break; - } - - if let Some(event) = classify_notification(¬ification) { - match event { - AcpEvent::Text(t) => { - if !got_first_text { - got_first_text = true; - // Reaction: back to thinking after tools - } - text_buf.push_str(&t); - let _ = buf_tx.send(compose_display(&tool_lines, &text_buf, true)); - } - AcpEvent::Thinking => { - reactions.set_thinking().await; - } - AcpEvent::ToolStart { id, title } if !title.is_empty() => { - reactions.set_tool(&title).await; - let title = sanitize_title(&title); - // Dedupe by toolCallId: replace if we've already - // seen this id, otherwise append a new entry. - // claude-agent-acp emits a placeholder title - // ("Terminal", "Edit", etc.) on the first event - // and refines it via tool_call_update; without - // dedup the placeholder and refined version - // appear as two separate orphaned lines. - if let Some(slot) = tool_lines.iter_mut().find(|e| e.id == id) { - slot.title = title; - slot.state = ToolState::Running; - } else { - tool_lines.push(ToolEntry { - id, - title, - state: ToolState::Running, - }); - } - let _ = buf_tx.send(compose_display(&tool_lines, &text_buf, true)); - } - AcpEvent::ToolDone { id, title, status } => { - reactions.set_thinking().await; - let new_state = if status == "completed" { - ToolState::Completed - } else { - ToolState::Failed - }; - // Find by id (the title is unreliable β€” substring - // match against the placeholder "Terminal" would - // never find the refined entry). Preserve the - // existing title if the Done event omits it. - if let Some(slot) = tool_lines.iter_mut().find(|e| e.id == id) { - if !title.is_empty() { - slot.title = sanitize_title(&title); - } - slot.state = new_state; - } else if !title.is_empty() { - // Done arrived without a prior Start (rare - // race) β€” record it so we still show - // something. - tool_lines.push(ToolEntry { - id, - title: sanitize_title(&title), - state: new_state, - }); - } - let _ = buf_tx.send(compose_display(&tool_lines, &text_buf, true)); - } - _ => {} - } - } - } - - conn.prompt_done().await; - drop(buf_tx); - let _ = edit_handle.await; - - // Final edit - let final_content = compose_display(&tool_lines, &text_buf, false); - // If ACP returned both an error and partial text, show both. - // This can happen when the agent started producing content before hitting an error - // (e.g. context length limit, rate limit mid-stream). Showing both gives users - // full context rather than hiding the partial response. - let final_content = if final_content.is_empty() { - if let Some(err) = response_error { - format!("⚠️ {}", err) - } else { - "_(no response)_".to_string() - } - } else if let Some(err) = response_error { - format!("⚠️ {}\n\n{}", err, final_content) - } else { - final_content - }; - - let chunks = format::split_message(&final_content, 2000); - for (i, chunk) in chunks.iter().enumerate() { - if i == 0 { - let _ = edit(&ctx, channel, current_msg_id, chunk).await; - } else { - let _ = channel.say(&ctx.http, chunk).await; - } - } - - Ok(()) - }) - }) - .await -} - -/// Flatten a tool-call title into a single line that's safe to render -/// inside Discord inline-code spans. Discord renders single-backtick -/// code on a single line only, so multi-line shell commands (heredocs, -/// `&&`-chained commands split across lines) appear truncated; we -/// collapse newlines to ` ; ` and rewrite embedded backticks so they -/// don't break the wrapping span. -fn sanitize_title(title: &str) -> String { - title.replace('\r', "").replace('\n', " ; ").replace('`', "'") -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum ToolState { - Running, - Completed, - Failed, -} - -#[derive(Debug, Clone)] -struct ToolEntry { - id: String, - title: String, - state: ToolState, -} - -impl ToolEntry { - fn render(&self) -> String { - let icon = match self.state { - ToolState::Running => "πŸ”§", - ToolState::Completed => "βœ…", - ToolState::Failed => "❌", - }; - let suffix = if self.state == ToolState::Running { "..." } else { "" }; - format!("{icon} `{}`{}", self.title, suffix) - } -} - -/// Maximum number of finished (or running) tool entries to show individually -/// during streaming before collapsing into a summary line. -/// -/// A typical tool line is 40–80 chars (icon + backtick title + suffix). -/// At 3 lines β‰ˆ 120–240 chars, consuming 6–13 % of the 1900-char Discord -/// streaming budget, leaving 1660+ chars for agent text. Beyond 3, tool -/// titles tend to grow (full shell commands, URLs) so budget consumption -/// rises non-linearly. 3 is also the practical "glanceable" limit. -const TOOL_COLLAPSE_THRESHOLD: usize = 3; - -fn compose_display(tool_lines: &[ToolEntry], text: &str, streaming: bool) -> String { - let mut out = String::new(); - if !tool_lines.is_empty() { - if streaming { - let done = tool_lines.iter().filter(|e| e.state == ToolState::Completed).count(); - let failed = tool_lines.iter().filter(|e| e.state == ToolState::Failed).count(); - let running: Vec<_> = tool_lines.iter().filter(|e| e.state == ToolState::Running).collect(); - let finished = done + failed; - - if finished <= TOOL_COLLAPSE_THRESHOLD { - for entry in tool_lines.iter().filter(|e| e.state != ToolState::Running) { - out.push_str(&entry.render()); - out.push('\n'); - } - } else { - let mut parts = Vec::new(); - if done > 0 { parts.push(format!("βœ… {done}")); } - if failed > 0 { parts.push(format!("❌ {failed}")); } - out.push_str(&format!("{} tool(s) completed\n", parts.join(" Β· "))); - } - - if running.len() <= TOOL_COLLAPSE_THRESHOLD { - for entry in &running { - out.push_str(&entry.render()); - out.push('\n'); - } - } else { - // Parallel running tools exceed threshold β€” show last N + summary - let hidden = running.len() - TOOL_COLLAPSE_THRESHOLD; - out.push_str(&format!("πŸ”§ {hidden} more running\n")); - for entry in running.iter().skip(hidden) { - out.push_str(&entry.render()); - out.push('\n'); - } - } - } else { - for entry in tool_lines { - out.push_str(&entry.render()); - out.push('\n'); - } - } - if !out.is_empty() { out.push('\n'); } - } - out.push_str(text.trim_end()); - out + let trigger_ref = discord_msg_ref(msg); + adapter.create_thread(&parent, &trigger_ref, &thread_name).await } static ROLE_MENTION_RE: LazyLock = LazyLock::new(|| { @@ -819,225 +411,3 @@ fn resolve_mentions(content: &str, bot_id: UserId, mentions: &[User]) -> String let out = USER_MENTION_RE.replace_all(&out, "@(user)").to_string(); out.trim().to_string() } - -fn shorten_thread_name(prompt: &str) -> String { - // Strip @(role) and @(user) placeholders left by resolve_mentions() - let cleaned = prompt.replace("@(role)", "").replace("@(user)", ""); - // Shorten GitHub URLs: https://github.com/owner/repo/issues/123 β†’ owner/repo#123 - let re = regex::Regex::new(r"https?://github\.com/([^/]+/[^/]+)/(issues|pull)/(\d+)").unwrap(); - let shortened = re.replace_all(cleaned.trim(), "$1#$3"); - let name: String = shortened.chars().take(40).collect(); - if name.len() < shortened.len() { - format!("{name}...") - } else { - name - } -} - -async fn get_or_create_thread(ctx: &Context, msg: &Message, prompt: &str) -> anyhow::Result { - let channel = msg.channel_id.to_channel(&ctx.http).await?; - if let serenity::model::channel::Channel::Guild(ref gc) = channel { - if gc.thread_metadata.is_some() { - return Ok(msg.channel_id.get()); - } - } - - let thread_name = shorten_thread_name(prompt); - - let thread = msg - .channel_id - .create_thread_from_message( - &ctx.http, - msg.id, - serenity::builder::CreateThread::new(thread_name) - .auto_archive_duration(serenity::model::channel::AutoArchiveDuration::OneDay), - ) - .await?; - - Ok(thread.id.get()) -} - - -#[cfg(test)] -mod tests { - use super::*; - - fn make_png(width: u32, height: u32) -> Vec { - let img = image::RgbImage::new(width, height); - let mut buf = Cursor::new(Vec::new()); - img.write_to(&mut buf, image::ImageFormat::Png).unwrap(); - buf.into_inner() - } - - #[test] - fn large_image_resized_to_max_dimension() { - let png = make_png(3000, 2000); - let (compressed, mime) = resize_and_compress(&png).unwrap(); - - assert_eq!(mime, "image/jpeg"); - let result = image::load_from_memory(&compressed).unwrap(); - assert!(result.width() <= IMAGE_MAX_DIMENSION_PX); - assert!(result.height() <= IMAGE_MAX_DIMENSION_PX); - } - - #[test] - fn small_image_keeps_original_dimensions() { - let png = make_png(800, 600); - let (compressed, mime) = resize_and_compress(&png).unwrap(); - - assert_eq!(mime, "image/jpeg"); - let result = image::load_from_memory(&compressed).unwrap(); - assert_eq!(result.width(), 800); - assert_eq!(result.height(), 600); - } - - #[test] - fn landscape_image_respects_aspect_ratio() { - let png = make_png(4000, 2000); - let (compressed, _) = resize_and_compress(&png).unwrap(); - - let result = image::load_from_memory(&compressed).unwrap(); - assert_eq!(result.width(), 1200); - assert_eq!(result.height(), 600); - } - - #[test] - fn portrait_image_respects_aspect_ratio() { - let png = make_png(2000, 4000); - let (compressed, _) = resize_and_compress(&png).unwrap(); - - let result = image::load_from_memory(&compressed).unwrap(); - assert_eq!(result.width(), 600); - assert_eq!(result.height(), 1200); - } - - #[test] - fn compressed_output_is_smaller_than_original() { - let png = make_png(3000, 2000); - let (compressed, _) = resize_and_compress(&png).unwrap(); - - assert!(compressed.len() < png.len(), "compressed {} should be < original {}", compressed.len(), png.len()); - } - - #[test] - fn gif_passes_through_unchanged() { - // Minimal valid GIF89a (1x1 pixel) - let gif: Vec = vec![ - 0x47, 0x49, 0x46, 0x38, 0x39, 0x61, // GIF89a - 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, // logical screen descriptor - 0x2C, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00, // image descriptor - 0x02, 0x02, 0x44, 0x01, 0x00, // image data - 0x3B, // trailer - ]; - let (output, mime) = resize_and_compress(&gif).unwrap(); - - assert_eq!(mime, "image/gif"); - assert_eq!(output, gif); - } - - #[test] - fn invalid_data_returns_error() { - let garbage = vec![0x00, 0x01, 0x02, 0x03]; - assert!(resize_and_compress(&garbage).is_err()); - } - - // --- compose_display tests --- - - fn tool(id: &str, title: &str, state: ToolState) -> ToolEntry { - ToolEntry { id: id.to_string(), title: title.to_string(), state } - } - - #[test] - fn compose_display_at_threshold_shows_individual_lines() { - let tools = vec![ - tool("1", "cmd-a", ToolState::Completed), - tool("2", "cmd-b", ToolState::Completed), - tool("3", "cmd-c", ToolState::Completed), - ]; - let out = compose_display(&tools, "hello", true); - assert!(out.contains("βœ… `cmd-a`"), "should show individual tool"); - assert!(out.contains("βœ… `cmd-b`"), "should show individual tool"); - assert!(out.contains("βœ… `cmd-c`"), "should show individual tool"); - assert!(!out.contains("tool(s) completed"), "should not collapse at threshold"); - } - - #[test] - fn compose_display_above_threshold_collapses() { - let tools = vec![ - tool("1", "cmd-a", ToolState::Completed), - tool("2", "cmd-b", ToolState::Completed), - tool("3", "cmd-c", ToolState::Completed), - tool("4", "cmd-d", ToolState::Completed), - ]; - let out = compose_display(&tools, "hello", true); - assert!(out.contains("βœ… 4 tool(s) completed"), "should collapse above threshold"); - assert!(!out.contains("`cmd-a`"), "individual tools should be hidden"); - } - - #[test] - fn compose_display_mixed_completed_and_failed() { - let tools = vec![ - tool("1", "ok-1", ToolState::Completed), - tool("2", "ok-2", ToolState::Completed), - tool("3", "ok-3", ToolState::Completed), - tool("4", "fail-1", ToolState::Failed), - tool("5", "fail-2", ToolState::Failed), - ]; - let out = compose_display(&tools, "", true); - assert!(out.contains("βœ… 3 Β· ❌ 2 tool(s) completed")); - } - - #[test] - fn compose_display_running_shown_alongside_collapsed() { - let tools = vec![ - tool("1", "done-1", ToolState::Completed), - tool("2", "done-2", ToolState::Completed), - tool("3", "done-3", ToolState::Completed), - tool("4", "done-4", ToolState::Completed), - tool("5", "active", ToolState::Running), - ]; - let out = compose_display(&tools, "text", true); - assert!(out.contains("βœ… 4 tool(s) completed")); - assert!(out.contains("πŸ”§ `active`...")); - assert!(out.contains("text")); - } - - #[test] - fn compose_display_parallel_running_guard() { - let tools: Vec<_> = (0..5) - .map(|i| tool(&i.to_string(), &format!("run-{i}"), ToolState::Running)) - .collect(); - let out = compose_display(&tools, "", true); - assert!(out.contains("πŸ”§ 2 more running"), "should collapse excess running tools"); - assert!(out.contains("πŸ”§ `run-3`..."), "should show recent running"); - assert!(out.contains("πŸ”§ `run-4`..."), "should show recent running"); - } - - #[test] - fn compose_display_non_streaming_shows_all() { - let tools = vec![ - tool("1", "cmd-a", ToolState::Completed), - tool("2", "cmd-b", ToolState::Completed), - tool("3", "cmd-c", ToolState::Completed), - tool("4", "cmd-d", ToolState::Completed), - tool("5", "cmd-e", ToolState::Failed), - ]; - let out = compose_display(&tools, "final", false); - assert!(out.contains("βœ… `cmd-a`")); - assert!(out.contains("βœ… `cmd-d`")); - assert!(out.contains("❌ `cmd-e`")); - assert!(out.contains("final")); - assert!(!out.contains("tool(s) completed"), "non-streaming should not collapse"); - } - - #[test] - fn tail_truncation_preserves_multibyte_chars() { - let content = "δ½ ε₯½δΈ–η•ŒπŸŒabcdefghij"; - let limit = 10; - let total = content.chars().count(); - let skip = total.saturating_sub(limit); - let truncated: String = content.chars().skip(skip).collect(); - assert_eq!(truncated.chars().count(), limit); - assert!(truncated.ends_with("abcdefghij")); - } -} diff --git a/src/format.rs b/src/format.rs index 44906e7..56f0fad 100644 --- a/src/format.rs +++ b/src/format.rs @@ -42,3 +42,21 @@ pub fn split_message(text: &str, limit: usize) -> Vec { } chunks } + +/// Shorten a prompt into a thread title: collapse GitHub URLs and cap at 40 chars. +pub fn shorten_thread_name(prompt: &str) -> String { + use std::sync::LazyLock; + static GH_RE: LazyLock = LazyLock::new(|| { + regex::Regex::new(r"https?://github\.com/([^/]+/[^/]+)/(issues|pull)/(\d+)").unwrap() + }); + // Strip @(role) and @(user) placeholders left by resolve_mentions() + let cleaned = prompt.replace("@(role)", "").replace("@(user)", ""); + let shortened = GH_RE.replace_all(cleaned.trim(), "$1#$3"); + let name: String = shortened.chars().take(40).collect(); + if name.len() < shortened.len() { + format!("{name}...") + } else { + name + } +} + diff --git a/src/main.rs b/src/main.rs index 59330ab..52b986d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,22 +1,26 @@ mod acp; +mod adapter; mod config; mod discord; mod error_display; mod format; +mod media; mod reactions; mod setup; +mod slack; mod stt; +use adapter::AdapterRouter; use clap::Parser; use serenity::prelude::*; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; -use tracing::info; +use tracing::{error, info}; #[derive(Parser)] #[command(name = "openab")] -#[command(about = "Discord bot that manages ACP agent sessions", long_about = None)] +#[command(about = "Multi-platform ACP agent broker (Discord, Slack)", long_about = None)] struct Cli { #[command(subcommand)] command: Option, @@ -51,7 +55,7 @@ async fn main() -> anyhow::Result<()> { match cmd { Commands::Setup { output } => { setup::run_setup(output.map(PathBuf::from))?; - return Ok(()); + Ok(()) } Commands::Run { config } => { let config_path = config @@ -62,22 +66,20 @@ async fn main() -> anyhow::Result<()> { info!( agent_cmd = %cfg.agent.command, pool_max = cfg.pool.max_sessions, - channels = ?cfg.discord.allowed_channels, - users = ?cfg.discord.allowed_users, + discord = cfg.discord.is_some(), + slack = cfg.slack.is_some(), reactions = cfg.reactions.enabled, - allow_bot_messages = ?cfg.discord.allow_bot_messages, "config loaded" ); + if cfg.discord.is_none() && cfg.slack.is_none() { + anyhow::bail!("no adapter configured β€” add [discord] and/or [slack] to config.toml"); + } + let pool = Arc::new(acp::SessionPool::new(cfg.agent, cfg.pool.max_sessions)); let ttl_secs = cfg.pool.session_ttl_hours * 3600; - let allowed_channels = parse_id_set(&cfg.discord.allowed_channels, "allowed_channels")?; - let allowed_users = parse_id_set(&cfg.discord.allowed_users, "allowed_users")?; - let trusted_bot_ids = parse_id_set(&cfg.discord.trusted_bot_ids, "trusted_bot_ids")?; - info!(channels = allowed_channels.len(), users = allowed_users.len(), trusted_bots = ?trusted_bot_ids, "parsed allowlists"); - - // Resolve STT config before constructing handler (auto-detect mutates cfg.stt) + // Resolve STT config (auto-detect GROQ_API_KEY from env) if cfg.stt.enabled { if cfg.stt.api_key.is_empty() && cfg.stt.base_url.contains("groq.com") { if let Ok(key) = std::env::var("GROQ_API_KEY") { @@ -93,23 +95,10 @@ async fn main() -> anyhow::Result<()> { info!(model = %cfg.stt.model, base_url = %cfg.stt.base_url, "STT enabled"); } - let handler = discord::Handler { - pool: pool.clone(), - allowed_channels, - allowed_users, - reactions_config: cfg.reactions, - stt_config: cfg.stt.clone(), - allow_bot_messages: cfg.discord.allow_bot_messages, - trusted_bot_ids, - }; - - let intents = GatewayIntents::GUILD_MESSAGES - | GatewayIntents::MESSAGE_CONTENT - | GatewayIntents::GUILDS; + let router = Arc::new(AdapterRouter::new(pool.clone(), cfg.reactions)); - let mut client = Client::builder(&cfg.discord.bot_token, intents) - .event_handler(handler) - .await?; + // Shutdown signal for Slack adapter + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); // Spawn cleanup task let cleanup_pool = pool.clone(); @@ -120,20 +109,91 @@ async fn main() -> anyhow::Result<()> { } }); - // Run bot until SIGINT/SIGTERM - let shard_manager = client.shard_manager.clone(); - let shutdown_pool = pool.clone(); - tokio::spawn(async move { + // Spawn Slack adapter (background task) + let slack_handle = if let Some(slack_cfg) = cfg.slack { + info!( + channels = slack_cfg.allowed_channels.len(), + users = slack_cfg.allowed_users.len(), + "starting slack adapter" + ); + let router = router.clone(); + let stt = cfg.stt.clone(); + Some(tokio::spawn(async move { + if let Err(e) = slack::run_slack_adapter( + slack_cfg.bot_token, + slack_cfg.app_token, + slack_cfg.allowed_channels.into_iter().collect(), + slack_cfg.allowed_users.into_iter().collect(), + stt, + router, + shutdown_rx, + ) + .await + { + error!("slack adapter error: {e}"); + } + })) + } else { + None + }; + + // Run Discord adapter (foreground, blocking) or wait for ctrl_c + if let Some(discord_cfg) = cfg.discord { + let allowed_channels = + parse_id_set(&discord_cfg.allowed_channels, "discord.allowed_channels")?; + let allowed_users = parse_id_set(&discord_cfg.allowed_users, "discord.allowed_users")?; + let trusted_bot_ids = parse_id_set(&discord_cfg.trusted_bot_ids, "discord.trusted_bot_ids")?; + info!( + channels = allowed_channels.len(), + users = allowed_users.len(), + trusted_bots = trusted_bot_ids.len(), + allow_bot_messages = ?discord_cfg.allow_bot_messages, + "starting discord adapter" + ); + + let handler = discord::Handler { + router, + allowed_channels, + allowed_users, + stt_config: cfg.stt.clone(), + adapter: std::sync::OnceLock::new(), + allow_bot_messages: discord_cfg.allow_bot_messages, + trusted_bot_ids, + }; + + let intents = GatewayIntents::GUILD_MESSAGES + | GatewayIntents::MESSAGE_CONTENT + | GatewayIntents::GUILDS; + + let mut client = Client::builder(&discord_cfg.bot_token, intents) + .event_handler(handler) + .await?; + + // Graceful Discord shutdown on ctrl_c + let shard_manager = client.shard_manager.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + info!("shutdown signal received"); + shard_manager.shutdown_all().await; + }); + + info!("discord bot running"); + client.start().await?; + } else { + // No Discord β€” just wait for ctrl_c + info!("running without discord, press ctrl+c to stop"); tokio::signal::ctrl_c().await.ok(); info!("shutdown signal received"); - shard_manager.shutdown_all().await; - }); - - info!("starting discord bot"); - client.start().await?; + } // Cleanup cleanup_handle.abort(); + // Signal Slack adapter to shut down gracefully + let _ = shutdown_tx.send(true); + if let Some(handle) = slack_handle { + let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; + } + let shutdown_pool = pool; shutdown_pool.shutdown().await; info!("openab shut down"); Ok(()) @@ -153,7 +213,9 @@ fn parse_id_set(raw: &[String], label: &str) -> anyhow::Result> { }) .collect(); if !raw.is_empty() && set.is_empty() { - anyhow::bail!("all {label} entries failed to parse β€” refusing to start with an empty allowlist"); + anyhow::bail!( + "all {label} entries failed to parse β€” refusing to start with an empty allowlist" + ); } Ok(set) } diff --git a/src/media.rs b/src/media.rs new file mode 100644 index 0000000..709f788 --- /dev/null +++ b/src/media.rs @@ -0,0 +1,266 @@ +use crate::acp::ContentBlock; +use crate::config::SttConfig; +use base64::engine::general_purpose::STANDARD as BASE64; +use base64::Engine; +use image::ImageReader; +use std::io::Cursor; +use std::sync::LazyLock; +use tracing::{debug, error}; + +/// Reusable HTTP client for downloading attachments (shared across adapters). +pub static HTTP_CLIENT: LazyLock = LazyLock::new(|| { + reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .expect("static HTTP client must build") +}); + +/// Maximum dimension (width or height) for resized images. +const IMAGE_MAX_DIMENSION_PX: u32 = 1200; + +/// JPEG quality for compressed output. +const IMAGE_JPEG_QUALITY: u8 = 75; + +/// Download an image from a URL, resize/compress it, and return as a ContentBlock. +/// Pass `auth_token` for platforms that require authentication (e.g. Slack private files). +pub async fn download_and_encode_image( + url: &str, + mime_hint: Option<&str>, + filename: &str, + size: u64, + auth_token: Option<&str>, +) -> Option { + const MAX_SIZE: u64 = 10 * 1024 * 1024; // 10 MB + + if url.is_empty() { + return None; + } + + let mime = mime_hint.or_else(|| { + filename + .rsplit('.') + .next() + .and_then(|ext| match ext.to_lowercase().as_str() { + "png" => Some("image/png"), + "jpg" | "jpeg" => Some("image/jpeg"), + "gif" => Some("image/gif"), + "webp" => Some("image/webp"), + _ => None, + }) + }); + + let Some(mime) = mime else { + debug!(filename, "skipping non-image attachment"); + return None; + }; + let mime = mime.split(';').next().unwrap_or(mime).trim(); + if !mime.starts_with("image/") { + debug!(filename, mime, "skipping non-image attachment"); + return None; + } + + if size > MAX_SIZE { + error!(filename, size, "image exceeds 10MB limit"); + return None; + } + + let mut req = HTTP_CLIENT.get(url); + if let Some(token) = auth_token { + req = req.header("Authorization", format!("Bearer {token}")); + } + + let response = match req.send().await { + Ok(resp) => resp, + Err(e) => { error!(url, error = %e, "download failed"); return None; } + }; + if !response.status().is_success() { + error!(url, status = %response.status(), "HTTP error downloading image"); + return None; + } + let bytes = match response.bytes().await { + Ok(b) => b, + Err(e) => { error!(url, error = %e, "read failed"); return None; } + }; + + if bytes.len() as u64 > MAX_SIZE { + error!(filename, size = bytes.len(), "downloaded image exceeds limit"); + return None; + } + + let (output_bytes, output_mime) = match resize_and_compress(&bytes) { + Ok(result) => result, + Err(e) => { + if bytes.len() > 1024 * 1024 { + error!(filename, error = %e, size = bytes.len(), "resize failed and original too large, skipping"); + return None; + } + debug!(filename, error = %e, "resize failed, using original"); + (bytes.to_vec(), mime.to_string()) + } + }; + + debug!( + filename, + original_size = bytes.len(), + compressed_size = output_bytes.len(), + "image processed" + ); + + let encoded = BASE64.encode(&output_bytes); + Some(ContentBlock::Image { + media_type: output_mime, + data: encoded, + }) +} + +/// Download an audio file and transcribe it via the configured STT provider. +/// Pass `auth_token` for platforms that require authentication. +pub async fn download_and_transcribe( + url: &str, + filename: &str, + mime_type: &str, + size: u64, + stt_config: &SttConfig, + auth_token: Option<&str>, +) -> Option { + const MAX_SIZE: u64 = 25 * 1024 * 1024; // 25 MB (Whisper API limit) + + if size > MAX_SIZE { + error!(filename, size, "audio exceeds 25MB limit"); + return None; + } + + let mut req = HTTP_CLIENT.get(url); + if let Some(token) = auth_token { + req = req.header("Authorization", format!("Bearer {token}")); + } + + let resp = req.send().await.ok()?; + if !resp.status().is_success() { + error!(url, status = %resp.status(), "audio download failed"); + return None; + } + let bytes = resp.bytes().await.ok()?.to_vec(); + + crate::stt::transcribe(&HTTP_CLIENT, stt_config, bytes, filename.to_string(), mime_type).await +} + +/// Resize image so longest side <= IMAGE_MAX_DIMENSION_PX, then encode as JPEG. +/// GIFs are passed through unchanged to preserve animation. +pub fn resize_and_compress(raw: &[u8]) -> Result<(Vec, String), image::ImageError> { + let reader = ImageReader::new(Cursor::new(raw)) + .with_guessed_format()?; + + let format = reader.format(); + + if format == Some(image::ImageFormat::Gif) { + return Ok((raw.to_vec(), "image/gif".to_string())); + } + + let img = reader.decode()?; + let (w, h) = (img.width(), img.height()); + + let img = if w > IMAGE_MAX_DIMENSION_PX || h > IMAGE_MAX_DIMENSION_PX { + let max_side = std::cmp::max(w, h); + let ratio = f64::from(IMAGE_MAX_DIMENSION_PX) / f64::from(max_side); + let new_w = (f64::from(w) * ratio) as u32; + let new_h = (f64::from(h) * ratio) as u32; + img.resize(new_w, new_h, image::imageops::FilterType::Lanczos3) + } else { + img + }; + + let mut buf = Cursor::new(Vec::new()); + let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, IMAGE_JPEG_QUALITY); + img.write_with_encoder(encoder)?; + + Ok((buf.into_inner(), "image/jpeg".to_string())) +} + +/// Check if a MIME type is audio. +pub fn is_audio_mime(mime: &str) -> bool { + mime.starts_with("audio/") +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_png(width: u32, height: u32) -> Vec { + let img = image::RgbImage::new(width, height); + let mut buf = Cursor::new(Vec::new()); + img.write_to(&mut buf, image::ImageFormat::Png).unwrap(); + buf.into_inner() + } + + #[test] + fn large_image_resized_to_max_dimension() { + let png = make_png(3000, 2000); + let (compressed, mime) = resize_and_compress(&png).unwrap(); + + assert_eq!(mime, "image/jpeg"); + let result = image::load_from_memory(&compressed).unwrap(); + assert!(result.width() <= IMAGE_MAX_DIMENSION_PX); + assert!(result.height() <= IMAGE_MAX_DIMENSION_PX); + } + + #[test] + fn small_image_keeps_original_dimensions() { + let png = make_png(800, 600); + let (compressed, mime) = resize_and_compress(&png).unwrap(); + + assert_eq!(mime, "image/jpeg"); + let result = image::load_from_memory(&compressed).unwrap(); + assert_eq!(result.width(), 800); + assert_eq!(result.height(), 600); + } + + #[test] + fn landscape_image_respects_aspect_ratio() { + let png = make_png(4000, 2000); + let (compressed, _) = resize_and_compress(&png).unwrap(); + + let result = image::load_from_memory(&compressed).unwrap(); + assert_eq!(result.width(), 1200); + assert_eq!(result.height(), 600); + } + + #[test] + fn portrait_image_respects_aspect_ratio() { + let png = make_png(2000, 4000); + let (compressed, _) = resize_and_compress(&png).unwrap(); + + let result = image::load_from_memory(&compressed).unwrap(); + assert_eq!(result.width(), 600); + assert_eq!(result.height(), 1200); + } + + #[test] + fn compressed_output_is_smaller_than_original() { + let png = make_png(3000, 2000); + let (compressed, _) = resize_and_compress(&png).unwrap(); + + assert!(compressed.len() < png.len(), "compressed {} should be < original {}", compressed.len(), png.len()); + } + + #[test] + fn gif_passes_through_unchanged() { + let gif: Vec = vec![ + 0x47, 0x49, 0x46, 0x38, 0x39, 0x61, + 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, + 0x2C, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00, + 0x02, 0x02, 0x44, 0x01, 0x00, + 0x3B, + ]; + let (output, mime) = resize_and_compress(&gif).unwrap(); + + assert_eq!(mime, "image/gif"); + assert_eq!(output, gif); + } + + #[test] + fn invalid_data_returns_error() { + let garbage = vec![0x00, 0x01, 0x02, 0x03]; + assert!(resize_and_compress(&garbage).is_err()); + } +} diff --git a/src/reactions.rs b/src/reactions.rs index 683d334..8638d86 100644 --- a/src/reactions.rs +++ b/src/reactions.rs @@ -1,7 +1,5 @@ +use crate::adapter::{ChatAdapter, MessageRef}; use crate::config::{ReactionEmojis, ReactionTiming}; -use serenity::http::Http; -use serenity::model::channel::ReactionType; -use serenity::model::id::{ChannelId, MessageId}; use std::sync::Arc; use tokio::sync::Mutex; use tokio::time::Duration; @@ -21,9 +19,8 @@ fn classify_tool<'a>(name: &str, emojis: &'a ReactionEmojis) -> &'a str { } struct Inner { - http: Arc, - channel: ChannelId, - message: MessageId, + adapter: Arc, + message: MessageRef, emojis: ReactionEmojis, timing: ReactionTiming, current: String, @@ -41,16 +38,14 @@ pub struct StatusReactionController { impl StatusReactionController { pub fn new( enabled: bool, - http: Arc, - channel: ChannelId, - message: MessageId, + adapter: Arc, + message: MessageRef, emojis: ReactionEmojis, timing: ReactionTiming, ) -> Self { Self { inner: Arc::new(Mutex::new(Inner { - http, - channel, + adapter, message, emojis, timing, @@ -93,7 +88,7 @@ impl StatusReactionController { let faces = ["😊", "😎", "🫑", "πŸ€“", "😏", "✌️", "πŸ’ͺ", "🦾"]; let face = faces[rand::random::() % faces.len()]; let inner = self.inner.lock().await; - let _ = add_reaction(&inner.http, inner.channel, inner.message, face).await; + let _ = inner.adapter.add_reaction(&inner.message, face).await; } pub async fn set_error(&self) { @@ -108,7 +103,7 @@ impl StatusReactionController { cancel_timers(&mut inner); let current = inner.current.clone(); if !current.is_empty() { - let _ = remove_reaction(&inner.http, inner.channel, inner.message, ¤t).await; + let _ = inner.adapter.remove_reaction(&inner.message, ¤t).await; inner.current.clear(); } } @@ -121,15 +116,14 @@ impl StatusReactionController { cancel_debounce(&mut inner); let old = inner.current.clone(); inner.current = emoji.to_string(); - let http = inner.http.clone(); - let ch = inner.channel; - let msg = inner.message; + let adapter = inner.adapter.clone(); + let msg = inner.message.clone(); let new = emoji.to_string(); drop(inner); - let _ = add_reaction(&http, ch, msg, &new).await; + let _ = adapter.add_reaction(&msg, &new).await; if !old.is_empty() && old != new { - let _ = remove_reaction(&http, ch, msg, &old).await; + let _ = adapter.remove_reaction(&msg, &old).await; } self.reset_stall_timers().await; } @@ -151,14 +145,13 @@ impl StatusReactionController { if inner.finished { return; } let old = inner.current.clone(); inner.current = emoji.clone(); - let http = inner.http.clone(); - let ch = inner.channel; - let msg = inner.message; + let adapter = inner.adapter.clone(); + let msg = inner.message.clone(); drop(inner); - let _ = add_reaction(&http, ch, msg, &emoji).await; + let _ = adapter.add_reaction(&msg, &emoji).await; if !old.is_empty() && old != emoji { - let _ = remove_reaction(&http, ch, msg, &old).await; + let _ = adapter.remove_reaction(&msg, &old).await; } })); self.reset_stall_timers_inner(&mut inner); @@ -172,15 +165,14 @@ impl StatusReactionController { let old = inner.current.clone(); inner.current = emoji.to_string(); - let http = inner.http.clone(); - let ch = inner.channel; - let msg = inner.message; + let adapter = inner.adapter.clone(); + let msg = inner.message.clone(); let new = emoji.to_string(); drop(inner); - let _ = add_reaction(&http, ch, msg, &new).await; + let _ = adapter.add_reaction(&msg, &new).await; if !old.is_empty() && old != new { - let _ = remove_reaction(&http, ch, msg, &old).await; + let _ = adapter.remove_reaction(&msg, &old).await; } } @@ -205,13 +197,12 @@ impl StatusReactionController { if inner.finished { return; } let old = inner.current.clone(); inner.current = "πŸ₯±".to_string(); - let http = inner.http.clone(); - let ch = inner.channel; - let msg = inner.message; + let adapter = inner.adapter.clone(); + let msg = inner.message.clone(); drop(inner); - let _ = add_reaction(&http, ch, msg, "πŸ₯±").await; + let _ = adapter.add_reaction(&msg, "πŸ₯±").await; if !old.is_empty() && old != "πŸ₯±" { - let _ = remove_reaction(&http, ch, msg, &old).await; + let _ = adapter.remove_reaction(&msg, &old).await; } } })); @@ -222,13 +213,12 @@ impl StatusReactionController { if inner.finished { return; } let old = inner.current.clone(); inner.current = "😨".to_string(); - let http = inner.http.clone(); - let ch = inner.channel; - let msg = inner.message; + let adapter = inner.adapter.clone(); + let msg = inner.message.clone(); drop(inner); - let _ = add_reaction(&http, ch, msg, "😨").await; + let _ = adapter.add_reaction(&msg, "😨").await; if !old.is_empty() && old != "😨" { - let _ = remove_reaction(&http, ch, msg, &old).await; + let _ = adapter.remove_reaction(&msg, &old).await; } })); } @@ -243,13 +233,3 @@ fn cancel_timers(inner: &mut Inner) { if let Some(h) = inner.stall_soft_handle.take() { h.abort(); } if let Some(h) = inner.stall_hard_handle.take() { h.abort(); } } - -async fn add_reaction(http: &Http, ch: ChannelId, msg: MessageId, emoji: &str) -> serenity::Result<()> { - let reaction = ReactionType::Unicode(emoji.to_string()); - http.create_reaction(ch, msg, &reaction).await -} - -async fn remove_reaction(http: &Http, ch: ChannelId, msg: MessageId, emoji: &str) -> serenity::Result<()> { - let reaction = ReactionType::Unicode(emoji.to_string()); - http.delete_reaction_me(ch, msg, &reaction).await -} diff --git a/src/slack.rs b/src/slack.rs new file mode 100644 index 0000000..9779e6d --- /dev/null +++ b/src/slack.rs @@ -0,0 +1,659 @@ +use crate::acp::ContentBlock; +use crate::adapter::{AdapterRouter, ChatAdapter, ChannelRef, MessageRef, SenderContext}; +use crate::config::SttConfig; +use crate::media; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use futures_util::{SinkExt, StreamExt}; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, LazyLock}; +use tokio::sync::watch; +use tokio_tungstenite::tungstenite; +use tracing::{debug, error, info, warn}; + +const SLACK_API: &str = "https://slack.com/api"; + +/// Map Unicode emoji to Slack short names for reactions API. +/// Only covers the default `[reactions.emojis]` set. Custom emoji configured +/// outside this map will fall back to `grey_question`. +fn unicode_to_slack_emoji(unicode: &str) -> &str { + match unicode { + "πŸ‘€" => "eyes", + "πŸ€”" => "thinking_face", + "πŸ”₯" => "fire", + "πŸ‘¨\u{200d}πŸ’»" => "technologist", + "⚑" => "zap", + "πŸ†—" => "ok", + "😱" => "scream", + "🚫" => "no_entry_sign", + "😊" => "blush", + "😎" => "sunglasses", + "🫑" => "saluting_face", + "πŸ€“" => "nerd_face", + "😏" => "smirk", + "✌\u{fe0f}" => "v", + "πŸ’ͺ" => "muscle", + "🦾" => "mechanical_arm", + "πŸ₯±" => "yawning_face", + "😨" => "fearful", + "βœ…" => "white_check_mark", + "❌" => "x", + "πŸ”§" => "wrench", + _ => "grey_question", + } +} + +// --- SlackAdapter: implements ChatAdapter for Slack --- + +/// TTL for cached user display names (5 minutes). +const USER_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(300); + +pub struct SlackAdapter { + client: reqwest::Client, + bot_token: String, + bot_user_id: tokio::sync::OnceCell, + user_cache: tokio::sync::Mutex>, +} + +impl SlackAdapter { + pub fn new(bot_token: String) -> Self { + Self { + client: reqwest::Client::new(), + bot_token, + bot_user_id: tokio::sync::OnceCell::new(), + user_cache: tokio::sync::Mutex::new(HashMap::new()), + } + } + + /// Get the bot's own Slack user ID (cached after first call). + async fn get_bot_user_id(&self) -> Option<&str> { + self.bot_user_id.get_or_try_init(|| async { + let resp = self.api_post("auth.test", serde_json::json!({})).await + .map_err(|e| anyhow!("auth.test failed: {e}"))?; + resp["user_id"] + .as_str() + .map(|s| s.to_string()) + .ok_or_else(|| anyhow!("no user_id in auth.test response")) + }).await.ok().map(|s| s.as_str()) + } + + async fn api_post(&self, method: &str, body: serde_json::Value) -> Result { + let resp = self + .client + .post(format!("{SLACK_API}/{method}")) + .header("Authorization", format!("Bearer {}", self.bot_token)) + .header("Content-Type", "application/json; charset=utf-8") + .json(&body) + .send() + .await?; + + let json: serde_json::Value = resp.json().await?; + if json["ok"].as_bool() != Some(true) { + let err = json["error"].as_str().unwrap_or("unknown error"); + return Err(anyhow!("Slack API {method}: {err}")); + } + Ok(json) + } + + /// Resolve a Slack user ID to display name via users.info API. + /// Results are cached for 5 minutes to avoid hitting Slack rate limits. + async fn resolve_user_name(&self, user_id: &str) -> Option { + // Check cache first + { + let cache = self.user_cache.lock().await; + if let Some((name, ts)) = cache.get(user_id) { + if ts.elapsed() < USER_CACHE_TTL { + return Some(name.clone()); + } + } + } + + let resp = self + .api_post( + "users.info", + serde_json::json!({ "user": user_id }), + ) + .await + .ok()?; + let user = resp.get("user")?; + let profile = user.get("profile")?; + let display = profile + .get("display_name") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()); + let real = profile + .get("real_name") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()); + let name = user + .get("name") + .and_then(|v| v.as_str()); + let resolved = display.or(real).or(name)?.to_string(); + + // Cache the result + self.user_cache.lock().await.insert( + user_id.to_string(), + (resolved.clone(), tokio::time::Instant::now()), + ); + + Some(resolved) + } +} + +#[async_trait] +impl ChatAdapter for SlackAdapter { + fn platform(&self) -> &'static str { + "slack" + } + + fn message_limit(&self) -> usize { + 4000 + } + + async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result { + let mrkdwn = markdown_to_mrkdwn(content); + let mut body = serde_json::json!({ + "channel": channel.channel_id, + "text": mrkdwn, + }); + if let Some(thread_ts) = &channel.thread_id { + body["thread_ts"] = serde_json::Value::String(thread_ts.clone()); + } + let resp = self.api_post("chat.postMessage", body).await?; + let ts = resp["ts"] + .as_str() + .ok_or_else(|| anyhow!("no ts in chat.postMessage response"))?; + Ok(MessageRef { + channel: ChannelRef { + platform: "slack".into(), + channel_id: channel.channel_id.clone(), + thread_id: channel.thread_id.clone(), + parent_id: None, + }, + message_id: ts.to_string(), + }) + } + + async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()> { + let mrkdwn = markdown_to_mrkdwn(content); + self.api_post( + "chat.update", + serde_json::json!({ + "channel": msg.channel.channel_id, + "ts": msg.message_id, + "text": mrkdwn, + }), + ) + .await?; + Ok(()) + } + + async fn create_thread( + &self, + channel: &ChannelRef, + trigger_msg: &MessageRef, + _title: &str, + ) -> Result { + // Slack threads are implicit β€” posting with thread_ts creates/continues a thread. + Ok(ChannelRef { + platform: "slack".into(), + channel_id: channel.channel_id.clone(), + thread_id: Some(trigger_msg.message_id.clone()), + parent_id: None, + }) + } + + async fn add_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()> { + let name = unicode_to_slack_emoji(emoji); + match self.api_post( + "reactions.add", + serde_json::json!({ + "channel": msg.channel.channel_id, + "timestamp": msg.message_id, + "name": name, + }), + ) + .await + { + Ok(_) => Ok(()), + Err(e) if e.to_string().contains("already_reacted") => Ok(()), + Err(e) => Err(e), + } + } + + async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()> { + let name = unicode_to_slack_emoji(emoji); + match self.api_post( + "reactions.remove", + serde_json::json!({ + "channel": msg.channel.channel_id, + "timestamp": msg.message_id, + "name": name, + }), + ) + .await + { + Ok(_) => Ok(()), + Err(e) if e.to_string().contains("no_reaction") => Ok(()), + Err(e) => Err(e), + } + } +} + +// --- Socket Mode event loop --- + +/// Run the Slack adapter using Socket Mode (persistent WebSocket, no public URL needed). +/// Reconnects automatically on disconnect. +pub async fn run_slack_adapter( + bot_token: String, + app_token: String, + allowed_channels: HashSet, + allowed_users: HashSet, + stt_config: SttConfig, + router: Arc, + mut shutdown_rx: watch::Receiver, +) -> Result<()> { + let adapter = Arc::new(SlackAdapter::new(bot_token.clone())); + + loop { + // Check for shutdown before (re)connecting + if *shutdown_rx.borrow() { + info!("Slack adapter shutting down"); + return Ok(()); + } + + let ws_url = match get_socket_mode_url(&app_token).await { + Ok(url) => url, + Err(e) => { + error!("failed to get Socket Mode URL: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + continue; + } + }; + info!(url = %ws_url, "connecting to Slack Socket Mode"); + + match tokio_tungstenite::connect_async(&ws_url).await { + Ok((ws_stream, _)) => { + info!("Slack Socket Mode connected"); + let (mut write, mut read) = ws_stream.split(); + + loop { + tokio::select! { + msg_result = read.next() => { + let Some(msg_result) = msg_result else { break }; + match msg_result { + Ok(tungstenite::Message::Text(text)) => { + let envelope: serde_json::Value = + match serde_json::from_str(&text) { + Ok(v) => v, + Err(_) => continue, + }; + + // Acknowledge the envelope immediately + if let Some(envelope_id) = envelope["envelope_id"].as_str() { + let ack = serde_json::json!({"envelope_id": envelope_id}); + let _ = write + .send(tungstenite::Message::Text(ack.to_string())) + .await; + } + + // Route events + if envelope["type"].as_str() == Some("events_api") { + let event = &envelope["payload"]["event"]; + let event_type = event["type"].as_str().unwrap_or(""); + match event_type { + "app_mention" => { + let event = event.clone(); + let adapter = adapter.clone(); + let bot_token = bot_token.clone(); + let allowed_channels = allowed_channels.clone(); + let allowed_users = allowed_users.clone(); + let stt_config = stt_config.clone(); + let router = router.clone(); + tokio::spawn(async move { + handle_message( + &event, + true, + &adapter, + &bot_token, + &allowed_channels, + &allowed_users, + &stt_config, + &router, + ) + .await; + }); + } + "message" => { + // Handle thread follow-ups without @mention. + // Skip bot messages and subtypes that aren't real user messages. + let has_thread = event["thread_ts"].is_string(); + let is_bot = event["bot_id"].is_string() + || event["subtype"].as_str() == Some("bot_message"); + let subtype = event["subtype"].as_str().unwrap_or(""); + let has_files = event["files"].is_array(); + // Skip messages that @mention the bot β€” app_mention handles those + let msg_text = event["text"].as_str().unwrap_or(""); + let mentions_bot = if let Some(bot_id) = adapter.get_bot_user_id().await { + msg_text.contains(&format!("<@{bot_id}>")) + } else { + false + }; + debug!( + has_thread, + is_bot, + subtype, + has_files, + mentions_bot, + text = msg_text, + "message event received" + ); + let skip_subtype = matches!(subtype, + "message_changed" | "message_deleted" | + "channel_join" | "channel_leave" | + "channel_topic" | "channel_purpose" + ); + if has_thread && !is_bot && !skip_subtype && !mentions_bot { + let event = event.clone(); + let adapter = adapter.clone(); + let bot_token = bot_token.clone(); + let allowed_channels = allowed_channels.clone(); + let allowed_users = allowed_users.clone(); + let stt_config = stt_config.clone(); + let router = router.clone(); + tokio::spawn(async move { + handle_message( + &event, + false, + &adapter, + &bot_token, + &allowed_channels, + &allowed_users, + &stt_config, + &router, + ) + .await; + }); + } + } + _ => {} + } + } + } + Ok(tungstenite::Message::Ping(data)) => { + let _ = write.send(tungstenite::Message::Pong(data)).await; + } + Ok(tungstenite::Message::Close(_)) => { + warn!("Slack Socket Mode connection closed by server"); + break; + } + Err(e) => { + error!("Socket Mode read error: {e}"); + break; + } + _ => {} + } + } + _ = shutdown_rx.changed() => { + info!("Slack adapter received shutdown signal"); + let _ = write.send(tungstenite::Message::Close(None)).await; + return Ok(()); + } + } + } + } + Err(e) => { + error!("failed to connect to Slack Socket Mode: {e}"); + } + } + + warn!("reconnecting to Slack Socket Mode in 5s..."); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } +} + +/// Call apps.connections.open to get a WebSocket URL for Socket Mode. +async fn get_socket_mode_url(app_token: &str) -> Result { + let client = reqwest::Client::new(); + let resp = client + .post(format!("{SLACK_API}/apps.connections.open")) + .header("Authorization", format!("Bearer {app_token}")) + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await?; + let json: serde_json::Value = resp.json().await?; + if json["ok"].as_bool() != Some(true) { + let err = json["error"].as_str().unwrap_or("unknown"); + return Err(anyhow!("apps.connections.open: {err}")); + } + json["url"] + .as_str() + .map(|s| s.to_string()) + .ok_or_else(|| anyhow!("no url in apps.connections.open response")) +} + +#[allow(clippy::too_many_arguments)] +async fn handle_message( + event: &serde_json::Value, + is_mention: bool, + adapter: &Arc, + bot_token: &str, + allowed_channels: &HashSet, + allowed_users: &HashSet, + stt_config: &SttConfig, + router: &Arc, +) { + let channel_id = match event["channel"].as_str() { + Some(ch) => ch.to_string(), + None => return, + }; + let user_id = match event["user"].as_str() { + Some(u) => u.to_string(), + None => return, + }; + let text = match event["text"].as_str() { + Some(t) => t.to_string(), + None => return, + }; + let ts = match event["ts"].as_str() { + Some(ts) => ts.to_string(), + None => return, + }; + let thread_ts = event["thread_ts"].as_str().map(|s| s.to_string()); + + // Check allowed channels (empty = allow all) + if !allowed_channels.is_empty() && !allowed_channels.contains(&channel_id) { + return; + } + + // Check allowed users + if !allowed_users.is_empty() && !allowed_users.contains(&user_id) { + tracing::info!(user_id, "denied Slack user, ignoring"); + let msg_ref = MessageRef { + channel: ChannelRef { + platform: "slack".into(), + channel_id: channel_id.clone(), + thread_id: thread_ts.clone(), + parent_id: None, + }, + message_id: ts.clone(), + }; + let _ = adapter.add_reaction(&msg_ref, "🚫").await; + return; + } + + // Strip bot mention from text only for @mention events + let prompt = if is_mention { + strip_slack_mention(&text) + } else { + // Thread follow-up: check for bot loop before processing + if let Some(ref tts) = thread_ts { + if is_bot_loop(adapter, &channel_id, tts).await { + tracing::warn!(channel_id, thread_ts = tts, "bot loop detected, ignoring"); + return; + } + } + text.trim().to_string() + }; + + // Process file attachments (images, audio) + let files = event["files"].as_array(); + let has_files = files.is_some_and(|f| !f.is_empty()); + + if prompt.is_empty() && !has_files { + return; + } + + let mut extra_blocks = Vec::new(); + if let Some(files) = files { + for file in files { + let mimetype = file["mimetype"].as_str().unwrap_or(""); + let filename = file["name"].as_str().unwrap_or("file"); + let size = file["size"].as_u64().unwrap_or(0); + // Slack private files require Bearer token to download + let url = file["url_private_download"] + .as_str() + .or_else(|| file["url_private"].as_str()) + .unwrap_or(""); + + if url.is_empty() { + continue; + } + + if media::is_audio_mime(mimetype) { + if stt_config.enabled { + if let Some(transcript) = media::download_and_transcribe( + url, + filename, + mimetype, + size, + stt_config, + Some(bot_token), + ).await { + debug!(filename, chars = transcript.len(), "voice transcript injected"); + extra_blocks.insert(0, ContentBlock::Text { + text: format!("[Voice message transcript]: {transcript}"), + }); + } + } else { + debug!(filename, "skipping audio attachment (STT disabled)"); + } + } else if let Some(block) = media::download_and_encode_image( + url, + Some(mimetype), + filename, + size, + Some(bot_token), + ).await { + debug!(filename, "adding image attachment"); + extra_blocks.push(block); + } + } + } + + // Resolve Slack display name (best-effort, fallback to user_id) + let display_name = adapter + .resolve_user_name(&user_id) + .await + .unwrap_or_else(|| user_id.clone()); + + let sender = SenderContext { + schema: "openab.sender.v1".into(), + sender_id: user_id.clone(), + sender_name: display_name.clone(), + display_name, + channel: "slack".into(), + channel_id: channel_id.clone(), + is_bot: false, + }; + + let trigger_msg = MessageRef { + channel: ChannelRef { + platform: "slack".into(), + channel_id: channel_id.clone(), + thread_id: thread_ts.clone(), + parent_id: None, + }, + message_id: ts.clone(), + }; + + // Determine thread: if already in a thread, continue it; otherwise start a new thread + let thread_channel = ChannelRef { + platform: "slack".into(), + channel_id: channel_id.clone(), + thread_id: Some(thread_ts.unwrap_or(ts)), + parent_id: None, + }; + + let adapter_dyn: Arc = adapter.clone(); + if let Err(e) = router + .handle_message(&adapter_dyn, &thread_channel, &sender, &prompt, extra_blocks, &trigger_msg) + .await + { + error!("Slack handle_message error: {e}"); + } +} + +static SLACK_MENTION_RE: LazyLock = + LazyLock::new(|| regex::Regex::new(r"<@[A-Z0-9]+>").unwrap()); + +/// Hard cap on consecutive bot messages in a thread. +/// Mirrors Discord's MAX_CONSECUTIVE_BOT_TURNS to prevent runaway loops. +const MAX_CONSECUTIVE_BOT_TURNS: usize = 10; + +/// Check if the last N messages in a Slack thread are all from bots. +async fn is_bot_loop(adapter: &SlackAdapter, channel: &str, thread_ts: &str) -> bool { + let resp = adapter + .api_post( + "conversations.replies", + serde_json::json!({ + "channel": channel, + "ts": thread_ts, + "limit": MAX_CONSECUTIVE_BOT_TURNS + 1, + "inclusive": true, + }), + ) + .await; + + let Ok(json) = resp else { return false }; // fail-open on API error + let Some(messages) = json["messages"].as_array() else { return false }; + + // Skip the first message (thread parent), count consecutive bot messages from the end + let recent: Vec<_> = messages.iter().skip(1).rev().collect(); + if recent.len() < MAX_CONSECUTIVE_BOT_TURNS { + return false; + } + + recent + .iter() + .take(MAX_CONSECUTIVE_BOT_TURNS) + .all(|m| m["bot_id"].is_string() || m["subtype"].as_str() == Some("bot_message")) +} + +fn strip_slack_mention(text: &str) -> String { + SLACK_MENTION_RE.replace_all(text, "").trim().to_string() +} + +/// Convert Markdown (as output by Claude Code) to Slack mrkdwn format. +fn markdown_to_mrkdwn(text: &str) -> String { + static BOLD_RE: LazyLock = + LazyLock::new(|| regex::Regex::new(r"\*\*(.+?)\*\*").unwrap()); + static ITALIC_RE: LazyLock = + LazyLock::new(|| regex::Regex::new(r"\*([^*]+?)\*").unwrap()); + static LINK_RE: LazyLock = + LazyLock::new(|| regex::Regex::new(r"\[([^\]]+)\]\(([^)]+)\)").unwrap()); + static HEADING_RE: LazyLock = + LazyLock::new(|| regex::Regex::new(r"(?m)^#{1,6}\s+(.+)$").unwrap()); + static CODE_BLOCK_LANG_RE: LazyLock = + LazyLock::new(|| regex::Regex::new(r"```\w+\n").unwrap()); + + // Order: bold first (** β†’ placeholder), then italic (* β†’ _), then restore bold + let text = BOLD_RE.replace_all(text, "\x01$1\x02"); // **bold** β†’ \x01bold\x02 + let text = ITALIC_RE.replace_all(&text, "_${1}_"); // *italic* β†’ _italic_ + // Restore bold: \x01bold\x02 β†’ *bold* + let text = text.replace(['\x01', '\x02'], "*"); + let text = LINK_RE.replace_all(&text, "<$2|$1>"); // [text](url) β†’ + let text = HEADING_RE.replace_all(&text, "*$1*"); // # heading β†’ *heading* + let text = CODE_BLOCK_LANG_RE.replace_all(&text, "```\n"); // ```rust β†’ ``` + text.into_owned() +}