diff --git a/Cargo.lock b/Cargo.lock
index 11a00e0..258ce02 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -113,9 +113,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bitflags"
-version = "2.11.0"
+version = "2.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af"
+checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3"
[[package]]
name = "block-buffer"
@@ -589,9 +589,9 @@ dependencies = [
[[package]]
name = "hyper-rustls"
-version = "0.27.8"
+version = "0.27.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2b52f86d1d4bc0d6b4e6826d960b1b333217e07d36b882dca570a5e1c48895b"
+checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f"
dependencies = [
"http",
"hyper",
@@ -829,9 +829,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "libc"
-version = "0.2.184"
+version = "0.2.185"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af"
+checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f"
[[package]]
name = "litemap"
@@ -960,11 +960,13 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "openab"
-version = "0.7.4"
+version = "0.7.6"
dependencies = [
"anyhow",
+ "async-trait",
"base64",
"clap",
+ "futures-util",
"image",
"libc",
"rand 0.8.5",
@@ -975,6 +977,7 @@ dependencies = [
"serde_json",
"serenity",
"tokio",
+ "tokio-tungstenite",
"toml",
"tracing",
"tracing-subscriber",
@@ -1114,7 +1117,7 @@ dependencies = [
"bytes",
"getrandom 0.3.4",
"lru-slab",
- "rand 0.9.3",
+ "rand 0.9.4",
"ring",
"rustc-hash",
"rustls 0.23.38",
@@ -1174,9 +1177,9 @@ dependencies = [
[[package]]
name = "rand"
-version = "0.9.3"
+version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7ec095654a25171c2124e9e3393a930bddbffdc939556c914957a4c3e0a87166"
+checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.5",
@@ -1365,7 +1368,7 @@ dependencies = [
"once_cell",
"ring",
"rustls-pki-types",
- "rustls-webpki 0.103.11",
+ "rustls-webpki 0.103.12",
"subtle",
"zeroize",
]
@@ -1393,9 +1396,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
-version = "0.103.11"
+version = "0.103.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "20a6af516fea4b20eccceaf166e8aa666ac996208e8a644ce3ef5aa783bc7cd4"
+checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06"
dependencies = [
"ring",
"rustls-pki-types",
@@ -1760,9 +1763,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.51.1"
+version = "1.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c"
+checksum = "a91135f59b1cbf38c91e73cf3386fca9bb77915c45ce2771460c9d92f0f3d776"
dependencies = [
"bytes",
"libc",
diff --git a/Cargo.toml b/Cargo.toml
index 8687438..419bade 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,6 +14,8 @@ serenity = { version = "0.12", default-features = false, features = ["client", "
uuid = { version = "1", features = ["v4"] }
regex = "1"
anyhow = "1"
+async-trait = "0.1"
+futures-util = "0.3"
rand = "0.8"
clap = { version = "4", features = ["derive"] }
rpassword = "7"
@@ -22,3 +24,4 @@ base64 = "0.22"
image = { version = "0.25", default-features = false, features = ["jpeg", "png", "gif", "webp"] }
unicode-width = "0.2"
libc = "0.2"
+tokio-tungstenite = { version = "0.21", features = ["rustls-tls-webpki-roots"] }
diff --git a/README.md b/README.md
index 33c5c33..bf79e83 100644
--- a/README.md
+++ b/README.md
@@ -4,15 +4,18 @@

-A lightweight, secure, cloud-native ACP harness that bridges Discord and any [Agent Client Protocol](https://github.com/anthropics/agent-protocol)-compatible coding CLI (Kiro CLI, Claude Code, Codex, Gemini, OpenCode, Copilot CLI, etc.) over stdio JSON-RPC β delivering the next-generation development experience.
+A lightweight, secure, cloud-native ACP harness that bridges **Discord, Slack**, and any [Agent Client Protocol](https://github.com/anthropics/agent-protocol)-compatible coding CLI (Kiro CLI, Claude Code, Codex, Gemini, OpenCode, Copilot CLI, etc.) over stdio JSON-RPC β delivering the next-generation development experience.
πͺΌ **Join our community!** Come say hi on Discord β we'd love to have you: **[πͺΌ OpenAB β Official](https://discord.gg/YNksK9M6)** π
```
ββββββββββββββββ Gateway WS ββββββββββββββββ ACP stdio ββββββββββββββββ
-β Discord ββββββββββββββββΊβ openab ββββββββββββββββΊβ coding CLI β
-β User β β (Rust) ββββ JSON-RPC βββ (acp mode) β
-ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
+β Discord ββββββββββββββββΊβ ββββββββββββββββΊβ coding CLI β
+β User β β openab ββββ JSON-RPC βββ (acp mode) β
+ββββββββββββββββ€ Socket Mode β (Rust) β ββββββββββββββββ
+β Slack ββββββββββββββββΊβ β
+β User β ββββββββββββββββ
+ββββββββββββββββ
```
## Demo
@@ -21,6 +24,7 @@ A lightweight, secure, cloud-native ACP harness that bridges Discord and any [Ag
## Features
+- **Multi-platform** β supports Discord and Slack, run one or both simultaneously
- **Pluggable agent backend** β swap between Kiro CLI, Claude Code, Codex, Gemini, OpenCode, Copilot CLI via config
- **@mention trigger** β mention the bot in an allowed channel to start a conversation
- **Thread-based multi-turn** β auto-creates threads; no @mention needed for follow-ups
@@ -33,10 +37,22 @@ A lightweight, secure, cloud-native ACP harness that bridges Discord and any [Ag
## Quick Start
-### 1. Create a Discord Bot
+### 1. Create a Bot
+
+
+Discord
See [docs/discord-bot-howto.md](docs/discord-bot-howto.md) for a detailed step-by-step guide.
+
+
+
+Slack
+
+See [docs/slack-bot-howto.md](docs/slack-bot-howto.md) for a detailed step-by-step guide.
+
+
+
### 2. Install with Helm (Kiro CLI β default)
```bash
@@ -46,6 +62,13 @@ helm repo update
helm install openab openab/openab \
--set agents.kiro.discord.botToken="$DISCORD_BOT_TOKEN" \
--set-string 'agents.kiro.discord.allowedChannels[0]=YOUR_CHANNEL_ID'
+
+# Slack
+helm install openab openab/openab \
+ --set agents.kiro.slack.enabled=true \
+ --set agents.kiro.slack.botToken="$SLACK_BOT_TOKEN" \
+ --set agents.kiro.slack.appToken="$SLACK_APP_TOKEN" \
+ --set-string 'agents.kiro.slack.allowedChannels[0]=C0123456789'
```
### 3. Authenticate (first time only)
@@ -64,6 +87,8 @@ In your Discord channel:
The bot creates a thread. After that, just type in the thread β no @mention needed.
+**Slack:** `@YourBot explain this code` in a channel β same thread-based workflow as Discord.
+
## Other Agents
| Agent | CLI | ACP Adapter | Guide |
@@ -96,6 +121,12 @@ bot_token = "${DISCORD_BOT_TOKEN}" # supports env var expansion
allowed_channels = ["123456789"] # channel ID allowlist
# allowed_users = ["987654321"] # user ID allowlist (empty = all users)
+[slack]
+bot_token = "${SLACK_BOT_TOKEN}" # Bot User OAuth Token (xoxb-...)
+app_token = "${SLACK_APP_TOKEN}" # App-Level Token (xapp-...) for Socket Mode
+allowed_channels = ["C0123456789"] # channel ID allowlist (empty = allow all)
+# allowed_users = ["U0123456789"] # user ID allowlist (empty = allow all)
+
[agent]
command = "kiro-cli" # CLI command
args = ["acp", "--trust-all-tools"] # ACP mode args
@@ -184,15 +215,18 @@ kubectl apply -f k8s/deployment.yaml
βββ config.toml.example # example config with all agent backends
βββ k8s/ # Kubernetes manifests
βββ src/
- βββ main.rs # entrypoint: tokio + serenity + cleanup + shutdown
+ βββ main.rs # entrypoint: multi-adapter startup, cleanup, shutdown
+ βββ adapter.rs # ChatAdapter trait, AdapterRouter (platform-agnostic)
βββ config.rs # TOML config + ${ENV_VAR} expansion
- βββ discord.rs # Discord bot: mention, threads, edit-streaming
- βββ format.rs # message splitting (2000 char limit)
+ βββ discord.rs # DiscordAdapter: serenity EventHandler + ChatAdapter impl
+ βββ slack.rs # SlackAdapter: Socket Mode + ChatAdapter impl
+ βββ media.rs # shared image resize/compress + STT download
+ βββ format.rs # message splitting, thread name shortening
βββ reactions.rs # status reaction controller (debounce, stall detection)
βββ acp/
βββ protocol.rs # JSON-RPC types + ACP event classification
βββ connection.rs # spawn CLI, stdio JSON-RPC communication
- βββ pool.rs # thread_id β AcpConnection map
+ βββ pool.rs # session key β AcpConnection map
```
## Inspired By
diff --git a/charts/openab/templates/NOTES.txt b/charts/openab/templates/NOTES.txt
index 3c69de9..f03ee06 100644
--- a/charts/openab/templates/NOTES.txt
+++ b/charts/openab/templates/NOTES.txt
@@ -1,15 +1,24 @@
openab {{ .Chart.AppVersion }} has been installed!
-β οΈ Discord channel IDs must be set with --set-string (not --set) to avoid float64 precision loss.
+β οΈ Channel/user IDs must be set with --set-string (not --set) to avoid float64 precision loss.
Agents deployed:
{{- range $name, $cfg := .Values.agents }}
{{- if ne (include "openab.agentEnabled" $cfg) "false" }}
β’ {{ $name }} ({{ $cfg.command }})
-{{- if not $cfg.discord.botToken }}
+{{- if not (or (and ($cfg.discord).enabled ($cfg.discord).botToken) (and ($cfg.slack).enabled ($cfg.slack).botToken)) }}
β οΈ No bot token provided. Create the secret manually:
kubectl create secret generic {{ include "openab.agentFullname" (dict "ctx" $ "agent" $name) }} \
- --from-literal=discord-bot-token="YOUR_TOKEN"
+ --from-literal=discord-bot-token="YOUR_DISCORD_TOKEN"
+{{- end }}
+
+{{- if and ($cfg.discord).enabled ($cfg.discord).botToken }}
+ Discord: β
configured
+{{- end }}
+{{- if and ($cfg.slack).enabled ($cfg.slack).botToken }}
+ Slack: β
configured (Socket Mode)
+ Ensure your Slack app has these bot events: app_mention, message.channels, message.groups
+ Required scopes: app_mentions:read, chat:write, channels:history, groups:history, channels:read, groups:read, reactions:write, files:read
{{- end }}
{{- if eq $cfg.command "kiro-cli" }}
diff --git a/charts/openab/templates/configmap.yaml b/charts/openab/templates/configmap.yaml
index cb7dce8..d90c3c5 100644
--- a/charts/openab/templates/configmap.yaml
+++ b/charts/openab/templates/configmap.yaml
@@ -10,6 +10,7 @@ metadata:
{{- include "openab.labels" $d | nindent 4 }}
data:
config.toml: |
+ {{- if and ($cfg.discord).enabled ($cfg.discord).botToken }}
[discord]
bot_token = "${DISCORD_BOT_TOKEN}"
{{- range $cfg.discord.allowedChannels }}
@@ -41,6 +42,25 @@ data:
{{- if $cfg.discord.trustedBotIds }}
trusted_bot_ids = {{ $cfg.discord.trustedBotIds | toJson }}
{{- end }}
+ {{- end }}
+
+ {{- if and ($cfg.slack).enabled }}
+ [slack]
+ bot_token = "${SLACK_BOT_TOKEN}"
+ app_token = "${SLACK_APP_TOKEN}"
+ {{- range ($cfg.slack).allowedChannels }}
+ {{- if regexMatch "e\\+|E\\+" (toString .) }}
+ {{- fail (printf "slack.allowedChannels contains a mangled ID: %s β use --set-string instead of --set for channel IDs" (toString .)) }}
+ {{- end }}
+ {{- end }}
+ allowed_channels = {{ ($cfg.slack).allowedChannels | default list | toJson }}
+ {{- range ($cfg.slack).allowedUsers }}
+ {{- if regexMatch "e\\+|E\\+" (toString .) }}
+ {{- fail (printf "slack.allowedUsers contains a mangled ID: %s β use --set-string instead of --set for user IDs" (toString .)) }}
+ {{- end }}
+ {{- end }}
+ allowed_users = {{ ($cfg.slack).allowedUsers | default list | toJson }}
+ {{- end }}
[agent]
command = "{{ $cfg.command }}"
diff --git a/charts/openab/templates/deployment.yaml b/charts/openab/templates/deployment.yaml
index b237475..2f895b2 100644
--- a/charts/openab/templates/deployment.yaml
+++ b/charts/openab/templates/deployment.yaml
@@ -38,13 +38,27 @@ spec:
{{- toYaml . | nindent 12 }}
{{- end }}
env:
- {{- if $cfg.discord.botToken }}
+ {{- if and ($cfg.discord).enabled ($cfg.discord).botToken }}
- name: DISCORD_BOT_TOKEN
valueFrom:
secretKeyRef:
name: {{ include "openab.agentFullname" $d }}
key: discord-bot-token
{{- end }}
+ {{- if and ($cfg.slack).enabled ($cfg.slack).botToken }}
+ - name: SLACK_BOT_TOKEN
+ valueFrom:
+ secretKeyRef:
+ name: {{ include "openab.agentFullname" $d }}
+ key: slack-bot-token
+ {{- end }}
+ {{- if and ($cfg.slack).enabled ($cfg.slack).appToken }}
+ - name: SLACK_APP_TOKEN
+ valueFrom:
+ secretKeyRef:
+ name: {{ include "openab.agentFullname" $d }}
+ key: slack-app-token
+ {{- end }}
{{- if and ($cfg.stt).enabled ($cfg.stt).apiKey }}
- name: STT_API_KEY
valueFrom:
diff --git a/charts/openab/templates/secret.yaml b/charts/openab/templates/secret.yaml
index 2cdd27c..d6907fd 100644
--- a/charts/openab/templates/secret.yaml
+++ b/charts/openab/templates/secret.yaml
@@ -1,6 +1,9 @@
{{- range $name, $cfg := .Values.agents }}
{{- if ne (include "openab.agentEnabled" $cfg) "false" }}
-{{- if $cfg.discord.botToken }}
+{{- $hasDiscord := and ($cfg.discord).enabled ($cfg.discord).botToken }}
+{{- $hasSlack := and ($cfg.slack).enabled (or ($cfg.slack).botToken ($cfg.slack).appToken) }}
+{{- $hasStt := and ($cfg.stt).enabled ($cfg.stt).apiKey }}
+{{- if or $hasDiscord $hasSlack $hasStt }}
{{- $d := dict "ctx" $ "agent" $name "cfg" $cfg }}
---
apiVersion: v1
@@ -13,8 +16,16 @@ metadata:
"helm.sh/resource-policy": keep
type: Opaque
data:
+ {{- if $hasDiscord }}
discord-bot-token: {{ $cfg.discord.botToken | b64enc | quote }}
- {{- if and ($cfg.stt).enabled ($cfg.stt).apiKey }}
+ {{- end }}
+ {{- if and ($cfg.slack).enabled ($cfg.slack).botToken }}
+ slack-bot-token: {{ $cfg.slack.botToken | b64enc | quote }}
+ {{- end }}
+ {{- if and ($cfg.slack).enabled ($cfg.slack).appToken }}
+ slack-app-token: {{ $cfg.slack.appToken | b64enc | quote }}
+ {{- end }}
+ {{- if $hasStt }}
stt-api-key: {{ $cfg.stt.apiKey | b64enc | quote }}
{{- end }}
{{- end }}
diff --git a/charts/openab/values.yaml b/charts/openab/values.yaml
index 8ed2ad9..aa9d414 100644
--- a/charts/openab/values.yaml
+++ b/charts/openab/values.yaml
@@ -115,6 +115,7 @@ agents:
- acp
- --trust-all-tools
discord:
+ enabled: true # set to false to disable; enabled with empty botToken is treated as disabled
botToken: ""
# β οΈ Use --set-string for channel IDs to avoid float64 precision loss
allowedChannels:
@@ -126,6 +127,12 @@ agents:
allowBotMessages: "off"
# trustedBotIds: [] # empty = any bot (mode permitting); set to restrict
trustedBotIds: []
+ slack:
+ enabled: false
+ botToken: "" # Bot User OAuth Token (xoxb-...)
+ appToken: "" # App-Level Token (xapp-...) for Socket Mode
+ allowedChannels: [] # empty = allow all channels
+ allowedUsers: [] # empty = allow all users
workingDir: /home/agent
env: {}
envFrom: []
diff --git a/config.toml.example b/config.toml.example
index 48cb44e..927db02 100644
--- a/config.toml.example
+++ b/config.toml.example
@@ -1,11 +1,19 @@
+# Enable one or more adapters. At least one [discord] or [slack] section is required.
+
[discord]
bot_token = "${DISCORD_BOT_TOKEN}"
-allowed_channels = ["1234567890"]
+allowed_channels = ["1234567890"] # empty or omitted = allow all channels
# allowed_users = [""] # empty or omitted = allow all users
# allow_bot_messages = "off" # "off" (default) | "mentions" | "all"
# "mentions" is recommended for multi-agent collaboration
# trusted_bot_ids = [] # empty = any bot (mode permitting); set to restrict
+# [slack]
+# bot_token = "${SLACK_BOT_TOKEN}" # Bot User OAuth Token (xoxb-...)
+# app_token = "${SLACK_APP_TOKEN}" # App-Level Token (xapp-...) for Socket Mode
+# allowed_channels = ["C0123456789"] # empty or omitted = allow all channels
+# allowed_users = ["U0123456789"] # empty or omitted = allow all users
+
[agent]
command = "kiro-cli"
args = ["acp", "--trust-all-tools"]
diff --git a/docs/slack-bot-howto.md b/docs/slack-bot-howto.md
new file mode 100644
index 0000000..8fa774e
--- /dev/null
+++ b/docs/slack-bot-howto.md
@@ -0,0 +1,124 @@
+# Slack Bot Setup Guide
+
+Step-by-step guide to create and configure a Slack bot for openab.
+
+## 1. Create a Slack App
+
+1. Go to https://api.slack.com/apps
+2. Click **Create New App** β **From scratch**
+3. Enter an app name (e.g. "OpenAB") and select your workspace
+4. Click **Create App**
+
+## 2. Enable Socket Mode
+
+Socket Mode uses a persistent WebSocket connection β no public URL or ingress needed.
+
+1. In the left sidebar, click **Socket Mode**
+2. Toggle **Enable Socket Mode** to ON
+3. You'll be prompted to generate an **App-Level Token**:
+ - Token name: `openab-socket` (or any name)
+ - Scope: `connections:write`
+ - Click **Generate**
+4. Copy the token (`xapp-...`) β this is your `SLACK_APP_TOKEN`
+
+## 3. Subscribe to Events
+
+1. In the left sidebar, click **Event Subscriptions**
+2. Toggle **Enable Events** to ON
+3. Under **Subscribe to bot events**, add:
+ - `app_mention` β triggers when someone @mentions the bot
+ - `message.channels` β receives messages in public channels (for thread follow-ups)
+ - `message.groups` β receives messages in private channels (for thread follow-ups)
+4. Click **Save Changes**
+
+## 4. Add Bot Token Scopes
+
+1. In the left sidebar, click **OAuth & Permissions**
+2. Under **Bot Token Scopes**, add:
+
+| Scope | Purpose |
+|-------|---------|
+| `app_mentions:read` | Receive @mention events |
+| `chat:write` | Send and edit messages |
+| `channels:history` | Read public channel messages (for thread context) |
+| `groups:history` | Read private channel messages (for thread context) |
+| `channels:read` | List public channels |
+| `groups:read` | List private channels |
+| `reactions:write` | Add/remove emoji reactions |
+| `files:read` | Download file attachments (images, audio) |
+| `users:read` | Resolve user display names |
+
+## 5. Install to Workspace
+
+1. In the left sidebar, click **Install App**
+2. Click **Install to Workspace** (or **Reinstall** if you've changed scopes)
+3. Authorize the requested permissions
+4. Copy the **Bot User OAuth Token** (`xoxb-...`) β this is your `SLACK_BOT_TOKEN`
+
+## 6. Configure openab
+
+Add the `[slack]` section to your `config.toml`:
+
+```toml
+[slack]
+bot_token = "${SLACK_BOT_TOKEN}"
+app_token = "${SLACK_APP_TOKEN}"
+allowed_channels = [] # empty = allow all channels
+# allowed_users = ["U0123456789"] # empty = allow all users
+```
+
+Set the environment variables:
+
+```bash
+export SLACK_BOT_TOKEN="xoxb-..."
+export SLACK_APP_TOKEN="xapp-..."
+```
+
+## 7. Invite the Bot
+
+In each Slack channel where you want to use the bot:
+
+```
+/invite @OpenAB
+```
+
+## 8. Test
+
+In a channel where the bot is invited:
+
+```
+@OpenAB explain this code
+```
+
+The bot will reply in a thread. After that, just type in the thread β no @mention needed for follow-ups.
+
+## Finding Channel and User IDs
+
+- **Channel ID**: Right-click the channel name β **View channel details** β ID at the bottom (starts with `C` for public, `G` for private)
+- **User ID**: Click a user's profile β **...** menu β **Copy member ID** (starts with `U`)
+
+## Troubleshooting
+
+### Bot doesn't respond to @mentions
+
+1. Verify Socket Mode is enabled in your app settings
+2. Check that `app_mention` is subscribed under **bot events** (not user events)
+3. Ensure the app is reinstalled after adding new event subscriptions
+4. Check the bot is invited to the channel (`/invite @YourSlackAppName`)
+5. Run with `RUST_LOG=openab=debug cargo run` to see incoming events
+
+### Bot doesn't respond to thread follow-ups
+
+1. Verify `message.channels` (and `message.groups` for private channels) are subscribed under **bot events**
+2. Reinstall the app after adding these events
+
+### "not_authed" or "invalid_auth" errors
+
+1. Verify your `SLACK_BOT_TOKEN` starts with `xoxb-`
+2. Verify your `SLACK_APP_TOKEN` starts with `xapp-`
+3. Check the tokens haven't been revoked in your app settings
+
+### Reactions not showing
+
+1. Verify `reactions:write` scope is added
+2. Reinstall the app after adding the scope
diff --git a/src/adapter.rs b/src/adapter.rs
new file mode 100644
index 0000000..2c2d096
--- /dev/null
+++ b/src/adapter.rs
@@ -0,0 +1,456 @@
+use anyhow::Result;
+use async_trait::async_trait;
+use serde::Serialize;
+use std::sync::Arc;
+use tokio::sync::watch;
+use tracing::error;
+
+use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool};
+use crate::config::ReactionsConfig;
+use crate::error_display::{format_coded_error, format_user_error};
+use crate::format;
+use crate::reactions::StatusReactionController;
+
+// --- Platform-agnostic types ---
+
+/// Identifies a channel or thread across platforms.
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub struct ChannelRef {
+ pub platform: String,
+ pub channel_id: String,
+ /// Thread within a channel (e.g. Slack thread_ts, Telegram topic_id).
+ /// For Discord, threads are separate channels so this is None.
+ pub thread_id: Option,
+ /// Parent channel if this is a thread-as-channel (Discord).
+ pub parent_id: Option,
+}
+
+/// Identifies a message across platforms.
+#[derive(Clone, Debug)]
+pub struct MessageRef {
+ pub channel: ChannelRef,
+ pub message_id: String,
+}
+
+/// Sender identity injected into prompts for downstream agent context.
+#[derive(Clone, Debug, Serialize)]
+pub struct SenderContext {
+ pub schema: String,
+ pub sender_id: String,
+ pub sender_name: String,
+ pub display_name: String,
+ pub channel: String,
+ pub channel_id: String,
+ pub is_bot: bool,
+}
+
+// --- ChatAdapter trait ---
+
+#[async_trait]
+pub trait ChatAdapter: Send + Sync + 'static {
+ /// Platform name for logging and session key namespacing.
+ fn platform(&self) -> &'static str;
+
+ /// Maximum message length for this platform (e.g. 2000 for Discord, 4000 for Slack).
+ fn message_limit(&self) -> usize;
+
+ /// Send a new message, returns a reference to the sent message.
+ async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result;
+
+ /// Edit an existing message in-place.
+ async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()>;
+
+ /// Create a thread from a trigger message, returns the thread channel ref.
+ async fn create_thread(
+ &self,
+ channel: &ChannelRef,
+ trigger_msg: &MessageRef,
+ title: &str,
+ ) -> Result;
+
+ /// Add a reaction/emoji to a message.
+ async fn add_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()>;
+
+ /// Remove a reaction/emoji from a message.
+ async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()>;
+}
+
+// --- AdapterRouter ---
+
+/// Shared logic for routing messages to ACP agents, managing sessions,
+/// streaming edits, and controlling reactions. Platform-independent.
+pub struct AdapterRouter {
+ pool: Arc,
+ reactions_config: ReactionsConfig,
+}
+
+impl AdapterRouter {
+ pub fn new(pool: Arc, reactions_config: ReactionsConfig) -> Self {
+ Self {
+ pool,
+ reactions_config,
+ }
+ }
+
+ /// Handle an incoming user message. The adapter is responsible for
+ /// filtering, resolving the thread, and building the SenderContext.
+ /// This method handles sender context injection, session management, and streaming.
+ pub async fn handle_message(
+ &self,
+ adapter: &Arc,
+ thread_channel: &ChannelRef,
+ sender: &SenderContext,
+ prompt: &str,
+ extra_blocks: Vec,
+ trigger_msg: &MessageRef,
+ ) -> Result<()> {
+ tracing::debug!(platform = adapter.platform(), "processing message");
+
+ // Build content blocks: sender context + prompt text, then extra (images, transcripts)
+ let sender_json = serde_json::to_string(sender).unwrap();
+ let prompt_with_sender = format!(
+ "\n{}\n\n\n{}",
+ sender_json, prompt
+ );
+
+ let mut content_blocks = Vec::with_capacity(1 + extra_blocks.len());
+ // Prepend any transcript blocks (they go before the text block)
+ for block in &extra_blocks {
+ if matches!(block, ContentBlock::Text { .. }) {
+ content_blocks.push(block.clone());
+ }
+ }
+ content_blocks.push(ContentBlock::Text {
+ text: prompt_with_sender,
+ });
+ // Append non-text blocks (images)
+ for block in extra_blocks {
+ if !matches!(block, ContentBlock::Text { .. }) {
+ content_blocks.push(block);
+ }
+ }
+
+ let thinking_msg = adapter.send_message(thread_channel, "...").await?;
+
+ let thread_key = format!(
+ "{}:{}",
+ adapter.platform(),
+ thread_channel
+ .thread_id
+ .as_deref()
+ .unwrap_or(&thread_channel.channel_id)
+ );
+
+ if let Err(e) = self.pool.get_or_create(&thread_key).await {
+ let msg = format_user_error(&e.to_string());
+ let _ = adapter
+ .edit_message(&thinking_msg, &format!("β οΈ {msg}"))
+ .await;
+ error!("pool error: {e}");
+ return Err(e);
+ }
+
+ let reactions = Arc::new(StatusReactionController::new(
+ self.reactions_config.enabled,
+ adapter.clone(),
+ trigger_msg.clone(),
+ self.reactions_config.emojis.clone(),
+ self.reactions_config.timing.clone(),
+ ));
+ reactions.set_queued().await;
+
+ let result = self
+ .stream_prompt(
+ adapter,
+ &thread_key,
+ content_blocks,
+ thread_channel,
+ &thinking_msg,
+ reactions.clone(),
+ )
+ .await;
+
+ match &result {
+ Ok(()) => reactions.set_done().await,
+ Err(_) => reactions.set_error().await,
+ }
+
+ let hold_ms = if result.is_ok() {
+ self.reactions_config.timing.done_hold_ms
+ } else {
+ self.reactions_config.timing.error_hold_ms
+ };
+ if self.reactions_config.remove_after_reply {
+ let reactions = reactions;
+ tokio::spawn(async move {
+ tokio::time::sleep(std::time::Duration::from_millis(hold_ms)).await;
+ reactions.clear().await;
+ });
+ }
+
+ if let Err(ref e) = result {
+ let _ = adapter
+ .edit_message(&thinking_msg, &format!("β οΈ {e}"))
+ .await;
+ }
+
+ result
+ }
+
+ async fn stream_prompt(
+ &self,
+ adapter: &Arc,
+ thread_key: &str,
+ content_blocks: Vec,
+ thread_channel: &ChannelRef,
+ thinking_msg: &MessageRef,
+ reactions: Arc,
+ ) -> Result<()> {
+ let adapter = adapter.clone();
+ let thread_channel = thread_channel.clone();
+ let msg_ref = thinking_msg.clone();
+ let message_limit = adapter.message_limit();
+
+ self.pool
+ .with_connection(thread_key, |conn| {
+ let content_blocks = content_blocks.clone();
+ Box::pin(async move {
+ let reset = conn.session_reset;
+ conn.session_reset = false;
+
+ let (mut rx, _) = conn.session_prompt(content_blocks).await?;
+ reactions.set_thinking().await;
+
+ let initial = if reset {
+ "β οΈ _Session expired, starting fresh..._\n\n...".to_string()
+ } else {
+ "...".to_string()
+ };
+ let (buf_tx, buf_rx) = watch::channel(initial);
+
+ let mut text_buf = String::new();
+ let mut tool_lines: Vec = Vec::new();
+
+ if reset {
+ text_buf.push_str("β οΈ _Session expired, starting fresh..._\n\n");
+ }
+
+ // Spawn edit-streaming task β only edits the single message, never sends new ones.
+ // Long content is truncated during streaming; final multi-message split happens after.
+ let streaming_limit = message_limit.saturating_sub(100);
+ let edit_handle = {
+ let adapter = adapter.clone();
+ let msg_ref = msg_ref.clone();
+ let mut buf_rx = buf_rx.clone();
+ tokio::spawn(async move {
+ let mut last_content = String::new();
+ loop {
+ tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
+ if buf_rx.has_changed().unwrap_or(false) {
+ let content = buf_rx.borrow_and_update().clone();
+ if content != last_content {
+ let display = if content.chars().count() > streaming_limit {
+ // Tail-priority: keep the last N chars so user
+ // sees the most recent agent output
+ let total = content.chars().count();
+ let skip = total - streaming_limit;
+ let truncated: String = content.chars().skip(skip).collect();
+ format!("β¦(truncated)\n{truncated}")
+ } else {
+ content.clone()
+ };
+ let _ = adapter.edit_message(&msg_ref, &display).await;
+ last_content = content;
+ }
+ }
+ if buf_rx.has_changed().is_err() {
+ break;
+ }
+ }
+ })
+ };
+
+ // Process ACP notifications
+ let mut got_first_text = false;
+ let mut response_error: Option = None;
+ while let Some(notification) = rx.recv().await {
+ if notification.id.is_some() {
+ if let Some(ref err) = notification.error {
+ response_error = Some(format_coded_error(err.code, &err.message));
+ }
+ break;
+ }
+
+ if let Some(event) = classify_notification(¬ification) {
+ match event {
+ AcpEvent::Text(t) => {
+ if !got_first_text {
+ got_first_text = true;
+ }
+ text_buf.push_str(&t);
+ let _ =
+ buf_tx.send(compose_display(&tool_lines, &text_buf, true));
+ }
+ AcpEvent::Thinking => {
+ reactions.set_thinking().await;
+ }
+ AcpEvent::ToolStart { id, title } if !title.is_empty() => {
+ reactions.set_tool(&title).await;
+ let title = sanitize_title(&title);
+ if let Some(slot) = tool_lines.iter_mut().find(|e| e.id == id) {
+ slot.title = title;
+ slot.state = ToolState::Running;
+ } else {
+ tool_lines.push(ToolEntry {
+ id,
+ title,
+ state: ToolState::Running,
+ });
+ }
+ let _ =
+ buf_tx.send(compose_display(&tool_lines, &text_buf, true));
+ }
+ AcpEvent::ToolDone { id, title, status } => {
+ reactions.set_thinking().await;
+ let new_state = if status == "completed" {
+ ToolState::Completed
+ } else {
+ ToolState::Failed
+ };
+ if let Some(slot) = tool_lines.iter_mut().find(|e| e.id == id) {
+ if !title.is_empty() {
+ slot.title = sanitize_title(&title);
+ }
+ slot.state = new_state;
+ } else if !title.is_empty() {
+ tool_lines.push(ToolEntry {
+ id,
+ title: sanitize_title(&title),
+ state: new_state,
+ });
+ }
+ let _ =
+ buf_tx.send(compose_display(&tool_lines, &text_buf, true));
+ }
+ _ => {}
+ }
+ }
+ }
+
+ conn.prompt_done().await;
+ drop(buf_tx);
+ let _ = edit_handle.await;
+
+ // Final edit with complete content
+ let final_content = compose_display(&tool_lines, &text_buf, false);
+ let final_content = if final_content.is_empty() {
+ if let Some(err) = response_error {
+ format!("β οΈ {err}")
+ } else {
+ "_(no response)_".to_string()
+ }
+ } else if let Some(err) = response_error {
+ format!("β οΈ {err}\n\n{final_content}")
+ } else {
+ final_content
+ };
+
+ let chunks = format::split_message(&final_content, message_limit);
+ let mut current_msg = msg_ref;
+ for (i, chunk) in chunks.iter().enumerate() {
+ if i == 0 {
+ let _ = adapter.edit_message(¤t_msg, chunk).await;
+ } else if let Ok(new_msg) =
+ adapter.send_message(&thread_channel, chunk).await
+ {
+ current_msg = new_msg;
+ }
+ }
+
+ Ok(())
+ })
+ })
+ .await
+ }
+}
+
+/// Flatten a tool-call title into a single line safe for inline-code spans.
+fn sanitize_title(title: &str) -> String {
+ title.replace('\r', "").replace('\n', " ; ").replace('`', "'")
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum ToolState {
+ Running,
+ Completed,
+ Failed,
+}
+
+#[derive(Debug, Clone)]
+struct ToolEntry {
+ id: String,
+ title: String,
+ state: ToolState,
+}
+
+impl ToolEntry {
+ fn render(&self) -> String {
+ let icon = match self.state {
+ ToolState::Running => "π§",
+ ToolState::Completed => "β
",
+ ToolState::Failed => "β",
+ };
+ let suffix = if self.state == ToolState::Running { "..." } else { "" };
+ format!("{icon} `{}`{}", self.title, suffix)
+ }
+}
+
+/// Maximum number of finished tool entries to show individually
+/// during streaming before collapsing into a summary line.
+const TOOL_COLLAPSE_THRESHOLD: usize = 3;
+
+fn compose_display(tool_lines: &[ToolEntry], text: &str, streaming: bool) -> String {
+ let mut out = String::new();
+ if !tool_lines.is_empty() {
+ if streaming {
+ let done = tool_lines.iter().filter(|e| e.state == ToolState::Completed).count();
+ let failed = tool_lines.iter().filter(|e| e.state == ToolState::Failed).count();
+ let running: Vec<_> = tool_lines.iter().filter(|e| e.state == ToolState::Running).collect();
+ let finished = done + failed;
+
+ if finished <= TOOL_COLLAPSE_THRESHOLD {
+ for entry in tool_lines.iter().filter(|e| e.state != ToolState::Running) {
+ out.push_str(&entry.render());
+ out.push('\n');
+ }
+ } else {
+ let mut parts = Vec::new();
+ if done > 0 { parts.push(format!("β
{done}")); }
+ if failed > 0 { parts.push(format!("β {failed}")); }
+ out.push_str(&format!("{} tool(s) completed\n", parts.join(" Β· ")));
+ }
+
+ if running.len() <= TOOL_COLLAPSE_THRESHOLD {
+ for entry in &running {
+ out.push_str(&entry.render());
+ out.push('\n');
+ }
+ } else {
+ let hidden = running.len() - TOOL_COLLAPSE_THRESHOLD;
+ out.push_str(&format!("π§ {hidden} more running\n"));
+ for entry in running.iter().skip(hidden) {
+ out.push_str(&entry.render());
+ out.push('\n');
+ }
+ }
+ } else {
+ for entry in tool_lines {
+ out.push_str(&entry.render());
+ out.push('\n');
+ }
+ }
+ if !out.is_empty() { out.push('\n'); }
+ }
+ out.push_str(text.trim_end());
+ out
+}
diff --git a/src/config.rs b/src/config.rs
index 658e00b..46efb1c 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -33,7 +33,8 @@ impl<'de> Deserialize<'de> for AllowBots {
#[derive(Debug, Deserialize)]
pub struct Config {
- pub discord: DiscordConfig,
+ pub discord: Option,
+ pub slack: Option,
pub agent: AgentConfig,
#[serde(default)]
pub pool: PoolConfig,
@@ -87,6 +88,16 @@ pub struct DiscordConfig {
pub trusted_bot_ids: Vec,
}
+#[derive(Debug, Deserialize)]
+pub struct SlackConfig {
+ pub bot_token: String,
+ pub app_token: String,
+ #[serde(default)]
+ pub allowed_channels: Vec,
+ #[serde(default)]
+ pub allowed_users: Vec,
+}
+
#[derive(Debug, Deserialize)]
pub struct AgentConfig {
pub command: String,
diff --git a/src/discord.rs b/src/discord.rs
index 278d3bf..ea02b7c 100644
--- a/src/discord.rs
+++ b/src/discord.rs
@@ -1,53 +1,132 @@
-use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool};
-use crate::config::{AllowBots, ReactionsConfig, SttConfig};
-use crate::error_display::{format_coded_error, format_user_error};
+use crate::acp::ContentBlock;
+use crate::adapter::{AdapterRouter, ChatAdapter, ChannelRef, MessageRef, SenderContext};
+use crate::config::{AllowBots, SttConfig};
use crate::format;
-use crate::reactions::StatusReactionController;
-use base64::engine::general_purpose::STANDARD as BASE64;
-use base64::Engine;
-use image::ImageReader;
-use std::io::Cursor;
+use crate::media;
+use async_trait::async_trait;
use std::sync::LazyLock;
-use serenity::async_trait;
-use serenity::model::channel::{Message, ReactionType};
+use serenity::builder::{CreateThread, EditMessage};
+use serenity::http::Http;
+use serenity::model::channel::{AutoArchiveDuration, Message, ReactionType};
use serenity::model::gateway::Ready;
use serenity::model::id::{ChannelId, MessageId, UserId};
use serenity::model::user::User;
use serenity::prelude::*;
use std::collections::HashSet;
-use std::sync::Arc;
-use tokio::sync::watch;
-use tracing::{debug, error, info, warn};
+use std::sync::{Arc, OnceLock};
+use tracing::{debug, error, info};
-/// Hard cap on consecutive bot messages (from any other bot) in a
-/// channel or thread. When this many recent messages are all from
-/// bots other than ourselves, we stop responding to prevent runaway
-/// loops between multiple bots in "all" mode.
-///
-/// Note: must be β€ 255 because Serenity's `GetMessages::limit()` takes `u8`.
-/// Inspired by OpenClaw's `session.agentToAgent.maxPingPongTurns`.
+/// Hard cap on consecutive bot messages in a channel or thread.
+/// Prevents runaway loops between multiple bots in "all" mode.
const MAX_CONSECUTIVE_BOT_TURNS: u8 = 10;
-/// Reusable HTTP client for downloading Discord attachments.
-/// Built once with a 30s timeout and rustls TLS (no native-tls deps).
-static HTTP_CLIENT: LazyLock = LazyLock::new(|| {
- reqwest::Client::builder()
- .timeout(std::time::Duration::from_secs(30))
- .build()
- .expect("static HTTP client must build")
-});
+// --- DiscordAdapter: implements ChatAdapter for Discord via serenity ---
+
+pub struct DiscordAdapter {
+ http: Arc,
+}
+
+impl DiscordAdapter {
+ pub fn new(http: Arc) -> Self {
+ Self { http }
+ }
+}
+
+#[async_trait]
+impl ChatAdapter for DiscordAdapter {
+ fn platform(&self) -> &'static str {
+ "discord"
+ }
+
+ fn message_limit(&self) -> usize {
+ 2000
+ }
+
+ async fn send_message(&self, channel: &ChannelRef, content: &str) -> anyhow::Result {
+ let ch_id: u64 = channel.channel_id.parse()?;
+ let msg = ChannelId::new(ch_id).say(&self.http, content).await?;
+ Ok(MessageRef {
+ channel: channel.clone(),
+ message_id: msg.id.to_string(),
+ })
+ }
+
+ async fn edit_message(&self, msg: &MessageRef, content: &str) -> anyhow::Result<()> {
+ let ch_id: u64 = msg.channel.channel_id.parse()?;
+ let msg_id: u64 = msg.message_id.parse()?;
+ ChannelId::new(ch_id)
+ .edit_message(
+ &self.http,
+ MessageId::new(msg_id),
+ EditMessage::new().content(content),
+ )
+ .await?;
+ Ok(())
+ }
+
+ async fn create_thread(
+ &self,
+ channel: &ChannelRef,
+ trigger_msg: &MessageRef,
+ title: &str,
+ ) -> anyhow::Result {
+ let ch_id: u64 = channel.channel_id.parse()?;
+ let msg_id: u64 = trigger_msg.message_id.parse()?;
+ let thread = ChannelId::new(ch_id)
+ .create_thread_from_message(
+ &self.http,
+ MessageId::new(msg_id),
+ CreateThread::new(title).auto_archive_duration(AutoArchiveDuration::OneDay),
+ )
+ .await?;
+ Ok(ChannelRef {
+ platform: "discord".into(),
+ channel_id: thread.id.to_string(),
+ thread_id: None,
+ parent_id: Some(channel.channel_id.clone()),
+ })
+ }
+
+ async fn add_reaction(&self, msg: &MessageRef, emoji: &str) -> anyhow::Result<()> {
+ let ch_id: u64 = msg.channel.channel_id.parse()?;
+ let msg_id: u64 = msg.message_id.parse()?;
+ self.http
+ .create_reaction(
+ ChannelId::new(ch_id),
+ MessageId::new(msg_id),
+ &ReactionType::Unicode(emoji.to_string()),
+ )
+ .await?;
+ Ok(())
+ }
+
+ async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> anyhow::Result<()> {
+ let ch_id: u64 = msg.channel.channel_id.parse()?;
+ let msg_id: u64 = msg.message_id.parse()?;
+ self.http
+ .delete_reaction_me(
+ ChannelId::new(ch_id),
+ MessageId::new(msg_id),
+ &ReactionType::Unicode(emoji.to_string()),
+ )
+ .await?;
+ Ok(())
+ }
+}
+
+// --- Handler: serenity EventHandler that delegates to AdapterRouter ---
pub struct Handler {
- pub pool: Arc,
+ pub router: Arc,
pub allowed_channels: HashSet,
pub allowed_users: HashSet,
- pub reactions_config: ReactionsConfig,
pub stt_config: SttConfig,
+ pub adapter: OnceLock>,
pub allow_bot_messages: AllowBots,
pub trusted_bot_ids: HashSet,
}
-#[async_trait]
+#[serenity::async_trait]
impl EventHandler for Handler {
async fn message(&self, ctx: Context, msg: Message) {
let bot_id = ctx.cache.current_user().id;
@@ -57,34 +136,27 @@ impl EventHandler for Handler {
return;
}
+ let adapter = self.adapter.get_or_init(|| {
+ Arc::new(DiscordAdapter::new(ctx.http.clone()))
+ }).clone();
+
let channel_id = msg.channel_id.get();
let in_allowed_channel =
self.allowed_channels.is_empty() || self.allowed_channels.contains(&channel_id);
let is_mentioned = msg.mentions_user_id(bot_id)
|| msg.content.contains(&format!("<@{}>", bot_id))
- || msg.mention_roles.iter().any(|r| msg.content.contains(&format!("<@&{}>", r)));
+ || msg
+ .mention_roles
+ .iter()
+ .any(|r| msg.content.contains(&format!("<@&{}>", r)));
- // Bot message gating β runs after self-ignore but before channel/user
- // allowlist checks. This ordering is intentional: channel checks below
- // apply uniformly to both human and bot messages, so a bot mention in
- // a non-allowed channel is still rejected by the channel check.
+ // Bot message gating (from upstream #321)
if msg.author.bot {
match self.allow_bot_messages {
AllowBots::Off => return,
AllowBots::Mentions => if !is_mentioned { return; },
AllowBots::All => {
- // Safety net: count consecutive messages from any bot
- // (excluding ourselves) in recent history. If all recent
- // messages are from other bots, we've likely entered a
- // loop. This counts *all* other-bot messages, not just
- // one specific bot β so 3 bots taking turns still hits
- // the cap (which is intentionally conservative).
- //
- // Try cache first to avoid an API call on every bot
- // message. Fall back to API on cache miss. If both fail,
- // reject the message (fail-closed) to avoid unbounded
- // loops during Discord API outages.
let cap = MAX_CONSECUTIVE_BOT_TURNS as usize;
let history = ctx.cache.channel_messages(msg.channel_id)
.map(|msgs| {
@@ -92,7 +164,7 @@ impl EventHandler for Handler {
.filter(|(mid, _)| **mid < msg.id)
.map(|(_, m)| m.clone())
.collect();
- recent.sort_unstable_by(|a, b| b.id.cmp(&a.id)); // newest first
+ recent.sort_unstable_by(|a, b| b.id.cmp(&a.id));
recent.truncate(cap);
recent
})
@@ -123,7 +195,6 @@ impl EventHandler for Handler {
},
}
- // If trusted_bot_ids is set, only allow bots on the list
if !self.trusted_bot_ids.is_empty() && !self.trusted_bot_ids.contains(&msg.author.id.get()) {
tracing::debug!(bot_id = %msg.author.id, "bot not in trusted_bot_ids, ignoring");
return;
@@ -161,9 +232,8 @@ impl EventHandler for Handler {
if !self.allowed_users.is_empty() && !self.allowed_users.contains(&msg.author.id.get()) {
tracing::info!(user_id = %msg.author.id, "denied user, ignoring");
- if let Err(e) = msg.react(&ctx.http, ReactionType::Unicode("π«".into())).await {
- tracing::warn!(error = %e, "failed to react with π«");
- }
+ let msg_ref = discord_msg_ref(&msg);
+ let _ = adapter.add_reaction(&msg_ref, "π«").await;
return;
}
@@ -173,86 +243,80 @@ impl EventHandler for Handler {
msg.content.trim().to_string()
};
- // No text and no image attachments β skip to avoid wasting session slots
+ // No text and no attachments β skip
if prompt.is_empty() && msg.attachments.is_empty() {
return;
}
- // Build content blocks: text + image attachments
- let mut content_blocks = vec![];
-
- // Inject structured sender context so the downstream CLI can identify who sent the message
- let display_name = msg.member.as_ref()
+ let display_name = msg
+ .member
+ .as_ref()
.and_then(|m| m.nick.as_ref())
.unwrap_or(&msg.author.name);
- let sender_ctx = serde_json::json!({
- "schema": "openab.sender.v1",
- "sender_id": msg.author.id.to_string(),
- "sender_name": msg.author.name,
- "display_name": display_name,
- "channel": "discord",
- "channel_id": msg.channel_id.to_string(),
- "is_bot": msg.author.bot,
- });
- let prompt_with_sender = format!(
- "\n{}\n\n\n{}",
- serde_json::to_string(&sender_ctx).unwrap(),
- prompt
- );
-
- // Add text block (always, even if empty, we still send for sender context)
- content_blocks.push(ContentBlock::Text {
- text: prompt_with_sender.clone(),
- });
+ let sender = SenderContext {
+ schema: "openab.sender.v1".into(),
+ sender_id: msg.author.id.to_string(),
+ sender_name: msg.author.name.clone(),
+ display_name: display_name.to_string(),
+ channel: "discord".into(),
+ channel_id: msg.channel_id.to_string(),
+ is_bot: msg.author.bot,
+ };
- // Process attachments: route by content type (audio β STT, image β encode)
- let mut audio_skipped = false;
- if !msg.attachments.is_empty() {
- for attachment in &msg.attachments {
- if is_audio_attachment(attachment) {
- if self.stt_config.enabled {
- if let Some(transcript) = download_and_transcribe(attachment, &self.stt_config).await {
- debug!(filename = %attachment.filename, chars = transcript.len(), "voice transcript injected");
- content_blocks.insert(0, ContentBlock::Text {
- text: format!("[Voice message transcript]: {transcript}"),
- });
- }
- } else {
- warn!(filename = %attachment.filename, "skipping audio attachment (STT disabled)");
- audio_skipped = true;
+ // Build extra content blocks from attachments (images, audio)
+ let mut extra_blocks = Vec::new();
+ for attachment in &msg.attachments {
+ let mime = attachment.content_type.as_deref().unwrap_or("");
+ if media::is_audio_mime(mime) {
+ if self.stt_config.enabled {
+ let mime_clean = mime.split(';').next().unwrap_or(mime).trim();
+ if let Some(transcript) = media::download_and_transcribe(
+ &attachment.url,
+ &attachment.filename,
+ mime_clean,
+ u64::from(attachment.size),
+ &self.stt_config,
+ None,
+ ).await {
+ debug!(filename = %attachment.filename, chars = transcript.len(), "voice transcript injected");
+ extra_blocks.insert(0, ContentBlock::Text {
+ text: format!("[Voice message transcript]: {transcript}"),
+ });
}
- } else if let Some(content_block) = download_and_encode_image(attachment).await {
- debug!(url = %attachment.url, filename = %attachment.filename, "adding image attachment");
- content_blocks.push(content_block);
+ } else {
+ tracing::warn!(filename = %attachment.filename, "skipping audio attachment (STT disabled)");
+ let msg_ref = discord_msg_ref(&msg);
+ let _ = adapter.add_reaction(&msg_ref, "π€").await;
}
- }
- }
-
- // If audio was skipped, react with π€ so the user knows their voice message was noticed
- if audio_skipped {
- let _ = msg.react(&ctx.http, ReactionType::Unicode("π€".into())).await;
- // Voice-only message: no text and only the sender_context block β early return
- if prompt.is_empty() && content_blocks.len() == 1 {
- return;
+ } else if let Some(block) = media::download_and_encode_image(
+ &attachment.url,
+ attachment.content_type.as_deref(),
+ &attachment.filename,
+ u64::from(attachment.size),
+ None,
+ ).await {
+ debug!(url = %attachment.url, filename = %attachment.filename, "adding image attachment");
+ extra_blocks.push(block);
}
}
tracing::debug!(
- text_len = prompt_with_sender.len(),
+ num_extra_blocks = extra_blocks.len(),
num_attachments = msg.attachments.len(),
in_thread,
"processing"
);
- // Note: image-only messages (no text) are intentionally allowed since
- // prompt_with_sender always includes the non-empty sender_context XML.
- // The guard above (prompt.is_empty() && no attachments) handles stickers/embeds.
-
- let thread_id = if in_thread {
- msg.channel_id.get()
+ let thread_channel = if in_thread {
+ ChannelRef {
+ platform: "discord".into(),
+ channel_id: msg.channel_id.get().to_string(),
+ thread_id: None,
+ parent_id: None,
+ }
} else {
- match get_or_create_thread(&ctx, &msg, &prompt).await {
- Ok(id) => id,
+ match get_or_create_thread(&ctx, &adapter, &msg, &prompt).await {
+ Ok(ch) => ch,
Err(e) => {
error!("failed to create thread: {e}");
return;
@@ -260,68 +324,14 @@ impl EventHandler for Handler {
}
};
- let thread_channel = ChannelId::new(thread_id);
-
- let thinking_msg = match thread_channel.say(&ctx.http, "...").await {
- Ok(m) => m,
- Err(e) => {
- error!("failed to post: {e}");
- return;
- }
- };
-
- let thread_key = thread_id.to_string();
- if let Err(e) = self.pool.get_or_create(&thread_key).await {
- let msg = format_user_error(&e.to_string());
- let _ = edit(&ctx, thread_channel, thinking_msg.id, &format!("β οΈ {}", msg)).await;
- error!("pool error: {e}");
- return;
- }
-
- // Create reaction controller on the user's original message
- let reactions = Arc::new(StatusReactionController::new(
- self.reactions_config.enabled,
- ctx.http.clone(),
- msg.channel_id,
- msg.id,
- self.reactions_config.emojis.clone(),
- self.reactions_config.timing.clone(),
- ));
- reactions.set_queued().await;
-
- // Stream prompt with live edits (pass content blocks instead of just text)
- let result = stream_prompt(
- &self.pool,
- &thread_key,
- content_blocks,
- &ctx,
- thread_channel,
- thinking_msg.id,
- reactions.clone(),
- )
- .await;
-
- match &result {
- Ok(()) => reactions.set_done().await,
- Err(_) => reactions.set_error().await,
- }
-
- // Hold emoji briefly then clear
- let hold_ms = if result.is_ok() {
- self.reactions_config.timing.done_hold_ms
- } else {
- self.reactions_config.timing.error_hold_ms
- };
- if self.reactions_config.remove_after_reply {
- let reactions = reactions;
- tokio::spawn(async move {
- tokio::time::sleep(std::time::Duration::from_millis(hold_ms)).await;
- reactions.clear().await;
- });
- }
+ let trigger_msg = discord_msg_ref(&msg);
- if let Err(e) = result {
- let _ = edit(&ctx, thread_channel, thinking_msg.id, &format!("β οΈ {e}")).await;
+ if let Err(e) = self
+ .router
+ .handle_message(&adapter, &thread_channel, &sender, &prompt, extra_blocks, &trigger_msg)
+ .await
+ {
+ error!("handle_message error: {e}");
}
}
@@ -330,465 +340,47 @@ impl EventHandler for Handler {
}
}
-/// Check if an attachment is an audio file (voice messages are typically audio/ogg).
-fn is_audio_attachment(attachment: &serenity::model::channel::Attachment) -> bool {
- let mime = attachment.content_type.as_deref().unwrap_or("");
- mime.starts_with("audio/")
-}
-
-/// Download an audio attachment and transcribe it via the configured STT provider.
-async fn download_and_transcribe(
- attachment: &serenity::model::channel::Attachment,
- stt_config: &SttConfig,
-) -> Option {
- const MAX_SIZE: u64 = 25 * 1024 * 1024; // 25 MB (Whisper API limit)
-
- if u64::from(attachment.size) > MAX_SIZE {
- error!(filename = %attachment.filename, size = attachment.size, "audio exceeds 25MB limit");
- return None;
- }
-
- let resp = HTTP_CLIENT.get(&attachment.url).send().await.ok()?;
- if !resp.status().is_success() {
- error!(url = %attachment.url, status = %resp.status(), "audio download failed");
- return None;
+// --- Discord-specific helpers ---
+
+fn discord_msg_ref(msg: &Message) -> MessageRef {
+ MessageRef {
+ channel: ChannelRef {
+ platform: "discord".into(),
+ channel_id: msg.channel_id.get().to_string(),
+ thread_id: None,
+ parent_id: None,
+ },
+ message_id: msg.id.to_string(),
}
- let bytes = resp.bytes().await.ok()?.to_vec();
-
- let mime_type = attachment.content_type.as_deref().unwrap_or("audio/ogg");
- let mime_type = mime_type.split(';').next().unwrap_or(mime_type).trim();
-
- crate::stt::transcribe(&HTTP_CLIENT, stt_config, bytes, attachment.filename.clone(), mime_type).await
}
-/// Maximum dimension (width or height) for resized images.
-/// Matches OpenClaw's DEFAULT_IMAGE_MAX_DIMENSION_PX.
-const IMAGE_MAX_DIMENSION_PX: u32 = 1200;
-
-/// JPEG quality for compressed output (OpenClaw uses progressive 85β35;
-/// we start at 75 which is a good balance of quality vs size).
-const IMAGE_JPEG_QUALITY: u8 = 75;
-
-/// Download a Discord image attachment, resize/compress it, then base64-encode
-/// as an ACP image content block.
-///
-/// Large images are resized so the longest side is at most 1200px and
-/// re-encoded as JPEG at quality 75. This keeps the base64 payload well
-/// under typical JSON-RPC transport limits (~200-400KB after encoding).
-async fn download_and_encode_image(attachment: &serenity::model::channel::Attachment) -> Option {
- const MAX_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
-
- let url = &attachment.url;
- if url.is_empty() {
- return None;
- }
-
- // Determine media type β prefer content-type header, fallback to extension
- let media_type = attachment
- .content_type
- .as_deref()
- .or_else(|| {
- attachment
- .filename
- .rsplit('.')
- .next()
- .and_then(|ext| match ext.to_lowercase().as_str() {
- "png" => Some("image/png"),
- "jpg" | "jpeg" => Some("image/jpeg"),
- "gif" => Some("image/gif"),
- "webp" => Some("image/webp"),
- _ => None,
- })
- });
-
- let Some(mime) = media_type else {
- debug!(filename = %attachment.filename, "skipping non-image attachment");
- return None;
- };
- let mime = mime.split(';').next().unwrap_or(mime).trim();
- if !mime.starts_with("image/") {
- debug!(filename = %attachment.filename, mime = %mime, "skipping non-image attachment");
- return None;
- }
-
- if u64::from(attachment.size) > MAX_SIZE {
- error!(filename = %attachment.filename, size = attachment.size, "image exceeds 10MB limit");
- return None;
- }
-
- let response = match HTTP_CLIENT.get(url).send().await {
- Ok(resp) => resp,
- Err(e) => { error!(url = %url, error = %e, "download failed"); return None; }
- };
- if !response.status().is_success() {
- error!(url = %url, status = %response.status(), "HTTP error downloading image");
- return None;
- }
- let bytes = match response.bytes().await {
- Ok(b) => b,
- Err(e) => { error!(url = %url, error = %e, "read failed"); return None; }
- };
-
- // Defense-in-depth: verify actual download size
- if bytes.len() as u64 > MAX_SIZE {
- error!(filename = %attachment.filename, size = bytes.len(), "downloaded image exceeds limit");
- return None;
- }
-
- // Resize and compress
- let (output_bytes, output_mime) = match resize_and_compress(&bytes) {
- Ok(result) => result,
- Err(e) => {
- // Fallback: use original bytes but reject if too large for transport
- if bytes.len() > 1024 * 1024 {
- error!(filename = %attachment.filename, error = %e, size = bytes.len(), "resize failed and original too large, skipping");
- return None;
- }
- debug!(filename = %attachment.filename, error = %e, "resize failed, using original");
- (bytes.to_vec(), mime.to_string())
+async fn get_or_create_thread(
+ ctx: &Context,
+ adapter: &Arc,
+ msg: &Message,
+ prompt: &str,
+) -> anyhow::Result {
+ let channel = msg.channel_id.to_channel(&ctx.http).await?;
+ if let serenity::model::channel::Channel::Guild(ref gc) = channel {
+ if gc.thread_metadata.is_some() {
+ return Ok(ChannelRef {
+ platform: "discord".into(),
+ channel_id: msg.channel_id.get().to_string(),
+ thread_id: None,
+ parent_id: None,
+ });
}
- };
-
- debug!(
- filename = %attachment.filename,
- original_size = bytes.len(),
- compressed_size = output_bytes.len(),
- "image processed"
- );
-
- let encoded = BASE64.encode(&output_bytes);
- Some(ContentBlock::Image {
- media_type: output_mime,
- data: encoded,
- })
-}
-
-/// Resize image so longest side β€ IMAGE_MAX_DIMENSION_PX, then encode as JPEG.
-/// Returns (compressed_bytes, mime_type). GIFs are passed through unchanged
-/// to preserve animation.
-fn resize_and_compress(raw: &[u8]) -> Result<(Vec, String), image::ImageError> {
- let reader = ImageReader::new(Cursor::new(raw))
- .with_guessed_format()?;
-
- let format = reader.format();
-
- // Pass through GIFs unchanged to preserve animation
- if format == Some(image::ImageFormat::Gif) {
- return Ok((raw.to_vec(), "image/gif".to_string()));
}
- let img = reader.decode()?;
- let (w, h) = (img.width(), img.height());
-
- // Resize preserving aspect ratio: scale so longest side = 1200px
- let img = if w > IMAGE_MAX_DIMENSION_PX || h > IMAGE_MAX_DIMENSION_PX {
- let max_side = std::cmp::max(w, h);
- let ratio = f64::from(IMAGE_MAX_DIMENSION_PX) / f64::from(max_side);
- let new_w = (f64::from(w) * ratio) as u32;
- let new_h = (f64::from(h) * ratio) as u32;
- img.resize(new_w, new_h, image::imageops::FilterType::Lanczos3)
- } else {
- img
+ let thread_name = format::shorten_thread_name(prompt);
+ let parent = ChannelRef {
+ platform: "discord".into(),
+ channel_id: msg.channel_id.get().to_string(),
+ thread_id: None,
+ parent_id: None,
};
-
- // Encode as JPEG
- let mut buf = Cursor::new(Vec::new());
- let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, IMAGE_JPEG_QUALITY);
- img.write_with_encoder(encoder)?;
-
- Ok((buf.into_inner(), "image/jpeg".to_string()))
-}
-
-async fn edit(ctx: &Context, ch: ChannelId, msg_id: MessageId, content: &str) -> serenity::Result {
- ch.edit_message(&ctx.http, msg_id, serenity::builder::EditMessage::new().content(content)).await
-}
-
-async fn stream_prompt(
- pool: &SessionPool,
- thread_key: &str,
- content_blocks: Vec,
- ctx: &Context,
- channel: ChannelId,
- msg_id: MessageId,
- reactions: Arc,
-) -> anyhow::Result<()> {
- let reactions = reactions.clone();
-
- pool.with_connection(thread_key, |conn| {
- let content_blocks = content_blocks.clone();
- let ctx = ctx.clone();
- let reactions = reactions.clone();
- Box::pin(async move {
- let reset = conn.session_reset;
- conn.session_reset = false;
-
- let (mut rx, _): (_, _) = conn.session_prompt(content_blocks).await?;
- reactions.set_thinking().await;
-
- let initial = if reset {
- "β οΈ _Session expired, starting fresh..._\n\n...".to_string()
- } else {
- "...".to_string()
- };
- let (buf_tx, buf_rx) = watch::channel(initial);
-
- let mut text_buf = String::new();
- // Tool calls indexed by toolCallId. Vec preserves first-seen
- // order. We store id + title + state separately so a ToolDone
- // event that arrives without a refreshed title (claude-agent-acp's
- // update events don't always re-send the title field) can still
- // reuse the title we already learned from a prior
- // tool_call_update β only the icon flips π§ β β
/ β. Rendering
- // happens on the fly in compose_display().
- let mut tool_lines: Vec = Vec::new();
- let current_msg_id = msg_id;
-
- if reset {
- text_buf.push_str("β οΈ _Session expired, starting fresh..._\n\n");
- }
-
- // Spawn edit-streaming task β only edits the single message, never sends new ones.
- // Long content is truncated during streaming; final multi-message split happens after.
- let edit_handle = {
- let ctx = ctx.clone();
- let mut buf_rx = buf_rx.clone();
- tokio::spawn(async move {
- let mut last_content = String::new();
- loop {
- tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
- if buf_rx.has_changed().unwrap_or(false) {
- let content = buf_rx.borrow_and_update().clone();
- if content != last_content {
- let display = if content.chars().count() > 1900 {
- // Tail-priority: keep the last 1900 chars so the
- // user always sees the most recent agent output
- // (e.g. a confirmation prompt) instead of old tool lines.
- let total = content.chars().count();
- let skip = total - 1900;
- let truncated: String = content.chars().skip(skip).collect();
- format!("β¦(truncated)\n{truncated}")
- } else {
- content.clone()
- };
- let _ = edit(&ctx, channel, msg_id, &display).await;
- last_content = content;
- }
- }
- if buf_rx.has_changed().is_err() {
- break;
- }
- }
- })
- };
-
- // Process ACP notifications
- let mut got_first_text = false;
- let mut response_error: Option = None;
- while let Some(notification) = rx.recv().await {
- if notification.id.is_some() {
- // Capture error from ACP response to display in Discord
- if let Some(ref err) = notification.error {
- response_error = Some(format_coded_error(err.code, &err.message));
- }
- break;
- }
-
- if let Some(event) = classify_notification(¬ification) {
- match event {
- AcpEvent::Text(t) => {
- if !got_first_text {
- got_first_text = true;
- // Reaction: back to thinking after tools
- }
- text_buf.push_str(&t);
- let _ = buf_tx.send(compose_display(&tool_lines, &text_buf, true));
- }
- AcpEvent::Thinking => {
- reactions.set_thinking().await;
- }
- AcpEvent::ToolStart { id, title } if !title.is_empty() => {
- reactions.set_tool(&title).await;
- let title = sanitize_title(&title);
- // Dedupe by toolCallId: replace if we've already
- // seen this id, otherwise append a new entry.
- // claude-agent-acp emits a placeholder title
- // ("Terminal", "Edit", etc.) on the first event
- // and refines it via tool_call_update; without
- // dedup the placeholder and refined version
- // appear as two separate orphaned lines.
- if let Some(slot) = tool_lines.iter_mut().find(|e| e.id == id) {
- slot.title = title;
- slot.state = ToolState::Running;
- } else {
- tool_lines.push(ToolEntry {
- id,
- title,
- state: ToolState::Running,
- });
- }
- let _ = buf_tx.send(compose_display(&tool_lines, &text_buf, true));
- }
- AcpEvent::ToolDone { id, title, status } => {
- reactions.set_thinking().await;
- let new_state = if status == "completed" {
- ToolState::Completed
- } else {
- ToolState::Failed
- };
- // Find by id (the title is unreliable β substring
- // match against the placeholder "Terminal" would
- // never find the refined entry). Preserve the
- // existing title if the Done event omits it.
- if let Some(slot) = tool_lines.iter_mut().find(|e| e.id == id) {
- if !title.is_empty() {
- slot.title = sanitize_title(&title);
- }
- slot.state = new_state;
- } else if !title.is_empty() {
- // Done arrived without a prior Start (rare
- // race) β record it so we still show
- // something.
- tool_lines.push(ToolEntry {
- id,
- title: sanitize_title(&title),
- state: new_state,
- });
- }
- let _ = buf_tx.send(compose_display(&tool_lines, &text_buf, true));
- }
- _ => {}
- }
- }
- }
-
- conn.prompt_done().await;
- drop(buf_tx);
- let _ = edit_handle.await;
-
- // Final edit
- let final_content = compose_display(&tool_lines, &text_buf, false);
- // If ACP returned both an error and partial text, show both.
- // This can happen when the agent started producing content before hitting an error
- // (e.g. context length limit, rate limit mid-stream). Showing both gives users
- // full context rather than hiding the partial response.
- let final_content = if final_content.is_empty() {
- if let Some(err) = response_error {
- format!("β οΈ {}", err)
- } else {
- "_(no response)_".to_string()
- }
- } else if let Some(err) = response_error {
- format!("β οΈ {}\n\n{}", err, final_content)
- } else {
- final_content
- };
-
- let chunks = format::split_message(&final_content, 2000);
- for (i, chunk) in chunks.iter().enumerate() {
- if i == 0 {
- let _ = edit(&ctx, channel, current_msg_id, chunk).await;
- } else {
- let _ = channel.say(&ctx.http, chunk).await;
- }
- }
-
- Ok(())
- })
- })
- .await
-}
-
-/// Flatten a tool-call title into a single line that's safe to render
-/// inside Discord inline-code spans. Discord renders single-backtick
-/// code on a single line only, so multi-line shell commands (heredocs,
-/// `&&`-chained commands split across lines) appear truncated; we
-/// collapse newlines to ` ; ` and rewrite embedded backticks so they
-/// don't break the wrapping span.
-fn sanitize_title(title: &str) -> String {
- title.replace('\r', "").replace('\n', " ; ").replace('`', "'")
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum ToolState {
- Running,
- Completed,
- Failed,
-}
-
-#[derive(Debug, Clone)]
-struct ToolEntry {
- id: String,
- title: String,
- state: ToolState,
-}
-
-impl ToolEntry {
- fn render(&self) -> String {
- let icon = match self.state {
- ToolState::Running => "π§",
- ToolState::Completed => "β
",
- ToolState::Failed => "β",
- };
- let suffix = if self.state == ToolState::Running { "..." } else { "" };
- format!("{icon} `{}`{}", self.title, suffix)
- }
-}
-
-/// Maximum number of finished (or running) tool entries to show individually
-/// during streaming before collapsing into a summary line.
-///
-/// A typical tool line is 40β80 chars (icon + backtick title + suffix).
-/// At 3 lines β 120β240 chars, consuming 6β13 % of the 1900-char Discord
-/// streaming budget, leaving 1660+ chars for agent text. Beyond 3, tool
-/// titles tend to grow (full shell commands, URLs) so budget consumption
-/// rises non-linearly. 3 is also the practical "glanceable" limit.
-const TOOL_COLLAPSE_THRESHOLD: usize = 3;
-
-fn compose_display(tool_lines: &[ToolEntry], text: &str, streaming: bool) -> String {
- let mut out = String::new();
- if !tool_lines.is_empty() {
- if streaming {
- let done = tool_lines.iter().filter(|e| e.state == ToolState::Completed).count();
- let failed = tool_lines.iter().filter(|e| e.state == ToolState::Failed).count();
- let running: Vec<_> = tool_lines.iter().filter(|e| e.state == ToolState::Running).collect();
- let finished = done + failed;
-
- if finished <= TOOL_COLLAPSE_THRESHOLD {
- for entry in tool_lines.iter().filter(|e| e.state != ToolState::Running) {
- out.push_str(&entry.render());
- out.push('\n');
- }
- } else {
- let mut parts = Vec::new();
- if done > 0 { parts.push(format!("β
{done}")); }
- if failed > 0 { parts.push(format!("β {failed}")); }
- out.push_str(&format!("{} tool(s) completed\n", parts.join(" Β· ")));
- }
-
- if running.len() <= TOOL_COLLAPSE_THRESHOLD {
- for entry in &running {
- out.push_str(&entry.render());
- out.push('\n');
- }
- } else {
- // Parallel running tools exceed threshold β show last N + summary
- let hidden = running.len() - TOOL_COLLAPSE_THRESHOLD;
- out.push_str(&format!("π§ {hidden} more running\n"));
- for entry in running.iter().skip(hidden) {
- out.push_str(&entry.render());
- out.push('\n');
- }
- }
- } else {
- for entry in tool_lines {
- out.push_str(&entry.render());
- out.push('\n');
- }
- }
- if !out.is_empty() { out.push('\n'); }
- }
- out.push_str(text.trim_end());
- out
+ let trigger_ref = discord_msg_ref(msg);
+ adapter.create_thread(&parent, &trigger_ref, &thread_name).await
}
static ROLE_MENTION_RE: LazyLock = LazyLock::new(|| {
@@ -819,225 +411,3 @@ fn resolve_mentions(content: &str, bot_id: UserId, mentions: &[User]) -> String
let out = USER_MENTION_RE.replace_all(&out, "@(user)").to_string();
out.trim().to_string()
}
-
-fn shorten_thread_name(prompt: &str) -> String {
- // Strip @(role) and @(user) placeholders left by resolve_mentions()
- let cleaned = prompt.replace("@(role)", "").replace("@(user)", "");
- // Shorten GitHub URLs: https://github.com/owner/repo/issues/123 β owner/repo#123
- let re = regex::Regex::new(r"https?://github\.com/([^/]+/[^/]+)/(issues|pull)/(\d+)").unwrap();
- let shortened = re.replace_all(cleaned.trim(), "$1#$3");
- let name: String = shortened.chars().take(40).collect();
- if name.len() < shortened.len() {
- format!("{name}...")
- } else {
- name
- }
-}
-
-async fn get_or_create_thread(ctx: &Context, msg: &Message, prompt: &str) -> anyhow::Result {
- let channel = msg.channel_id.to_channel(&ctx.http).await?;
- if let serenity::model::channel::Channel::Guild(ref gc) = channel {
- if gc.thread_metadata.is_some() {
- return Ok(msg.channel_id.get());
- }
- }
-
- let thread_name = shorten_thread_name(prompt);
-
- let thread = msg
- .channel_id
- .create_thread_from_message(
- &ctx.http,
- msg.id,
- serenity::builder::CreateThread::new(thread_name)
- .auto_archive_duration(serenity::model::channel::AutoArchiveDuration::OneDay),
- )
- .await?;
-
- Ok(thread.id.get())
-}
-
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- fn make_png(width: u32, height: u32) -> Vec {
- let img = image::RgbImage::new(width, height);
- let mut buf = Cursor::new(Vec::new());
- img.write_to(&mut buf, image::ImageFormat::Png).unwrap();
- buf.into_inner()
- }
-
- #[test]
- fn large_image_resized_to_max_dimension() {
- let png = make_png(3000, 2000);
- let (compressed, mime) = resize_and_compress(&png).unwrap();
-
- assert_eq!(mime, "image/jpeg");
- let result = image::load_from_memory(&compressed).unwrap();
- assert!(result.width() <= IMAGE_MAX_DIMENSION_PX);
- assert!(result.height() <= IMAGE_MAX_DIMENSION_PX);
- }
-
- #[test]
- fn small_image_keeps_original_dimensions() {
- let png = make_png(800, 600);
- let (compressed, mime) = resize_and_compress(&png).unwrap();
-
- assert_eq!(mime, "image/jpeg");
- let result = image::load_from_memory(&compressed).unwrap();
- assert_eq!(result.width(), 800);
- assert_eq!(result.height(), 600);
- }
-
- #[test]
- fn landscape_image_respects_aspect_ratio() {
- let png = make_png(4000, 2000);
- let (compressed, _) = resize_and_compress(&png).unwrap();
-
- let result = image::load_from_memory(&compressed).unwrap();
- assert_eq!(result.width(), 1200);
- assert_eq!(result.height(), 600);
- }
-
- #[test]
- fn portrait_image_respects_aspect_ratio() {
- let png = make_png(2000, 4000);
- let (compressed, _) = resize_and_compress(&png).unwrap();
-
- let result = image::load_from_memory(&compressed).unwrap();
- assert_eq!(result.width(), 600);
- assert_eq!(result.height(), 1200);
- }
-
- #[test]
- fn compressed_output_is_smaller_than_original() {
- let png = make_png(3000, 2000);
- let (compressed, _) = resize_and_compress(&png).unwrap();
-
- assert!(compressed.len() < png.len(), "compressed {} should be < original {}", compressed.len(), png.len());
- }
-
- #[test]
- fn gif_passes_through_unchanged() {
- // Minimal valid GIF89a (1x1 pixel)
- let gif: Vec = vec![
- 0x47, 0x49, 0x46, 0x38, 0x39, 0x61, // GIF89a
- 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, // logical screen descriptor
- 0x2C, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00, // image descriptor
- 0x02, 0x02, 0x44, 0x01, 0x00, // image data
- 0x3B, // trailer
- ];
- let (output, mime) = resize_and_compress(&gif).unwrap();
-
- assert_eq!(mime, "image/gif");
- assert_eq!(output, gif);
- }
-
- #[test]
- fn invalid_data_returns_error() {
- let garbage = vec![0x00, 0x01, 0x02, 0x03];
- assert!(resize_and_compress(&garbage).is_err());
- }
-
- // --- compose_display tests ---
-
- fn tool(id: &str, title: &str, state: ToolState) -> ToolEntry {
- ToolEntry { id: id.to_string(), title: title.to_string(), state }
- }
-
- #[test]
- fn compose_display_at_threshold_shows_individual_lines() {
- let tools = vec![
- tool("1", "cmd-a", ToolState::Completed),
- tool("2", "cmd-b", ToolState::Completed),
- tool("3", "cmd-c", ToolState::Completed),
- ];
- let out = compose_display(&tools, "hello", true);
- assert!(out.contains("β
`cmd-a`"), "should show individual tool");
- assert!(out.contains("β
`cmd-b`"), "should show individual tool");
- assert!(out.contains("β
`cmd-c`"), "should show individual tool");
- assert!(!out.contains("tool(s) completed"), "should not collapse at threshold");
- }
-
- #[test]
- fn compose_display_above_threshold_collapses() {
- let tools = vec![
- tool("1", "cmd-a", ToolState::Completed),
- tool("2", "cmd-b", ToolState::Completed),
- tool("3", "cmd-c", ToolState::Completed),
- tool("4", "cmd-d", ToolState::Completed),
- ];
- let out = compose_display(&tools, "hello", true);
- assert!(out.contains("β
4 tool(s) completed"), "should collapse above threshold");
- assert!(!out.contains("`cmd-a`"), "individual tools should be hidden");
- }
-
- #[test]
- fn compose_display_mixed_completed_and_failed() {
- let tools = vec![
- tool("1", "ok-1", ToolState::Completed),
- tool("2", "ok-2", ToolState::Completed),
- tool("3", "ok-3", ToolState::Completed),
- tool("4", "fail-1", ToolState::Failed),
- tool("5", "fail-2", ToolState::Failed),
- ];
- let out = compose_display(&tools, "", true);
- assert!(out.contains("β
3 Β· β 2 tool(s) completed"));
- }
-
- #[test]
- fn compose_display_running_shown_alongside_collapsed() {
- let tools = vec![
- tool("1", "done-1", ToolState::Completed),
- tool("2", "done-2", ToolState::Completed),
- tool("3", "done-3", ToolState::Completed),
- tool("4", "done-4", ToolState::Completed),
- tool("5", "active", ToolState::Running),
- ];
- let out = compose_display(&tools, "text", true);
- assert!(out.contains("β
4 tool(s) completed"));
- assert!(out.contains("π§ `active`..."));
- assert!(out.contains("text"));
- }
-
- #[test]
- fn compose_display_parallel_running_guard() {
- let tools: Vec<_> = (0..5)
- .map(|i| tool(&i.to_string(), &format!("run-{i}"), ToolState::Running))
- .collect();
- let out = compose_display(&tools, "", true);
- assert!(out.contains("π§ 2 more running"), "should collapse excess running tools");
- assert!(out.contains("π§ `run-3`..."), "should show recent running");
- assert!(out.contains("π§ `run-4`..."), "should show recent running");
- }
-
- #[test]
- fn compose_display_non_streaming_shows_all() {
- let tools = vec![
- tool("1", "cmd-a", ToolState::Completed),
- tool("2", "cmd-b", ToolState::Completed),
- tool("3", "cmd-c", ToolState::Completed),
- tool("4", "cmd-d", ToolState::Completed),
- tool("5", "cmd-e", ToolState::Failed),
- ];
- let out = compose_display(&tools, "final", false);
- assert!(out.contains("β
`cmd-a`"));
- assert!(out.contains("β
`cmd-d`"));
- assert!(out.contains("β `cmd-e`"));
- assert!(out.contains("final"));
- assert!(!out.contains("tool(s) completed"), "non-streaming should not collapse");
- }
-
- #[test]
- fn tail_truncation_preserves_multibyte_chars() {
- let content = "δ½ ε₯½δΈηπabcdefghij";
- let limit = 10;
- let total = content.chars().count();
- let skip = total.saturating_sub(limit);
- let truncated: String = content.chars().skip(skip).collect();
- assert_eq!(truncated.chars().count(), limit);
- assert!(truncated.ends_with("abcdefghij"));
- }
-}
diff --git a/src/format.rs b/src/format.rs
index 44906e7..56f0fad 100644
--- a/src/format.rs
+++ b/src/format.rs
@@ -42,3 +42,21 @@ pub fn split_message(text: &str, limit: usize) -> Vec {
}
chunks
}
+
+/// Shorten a prompt into a thread title: collapse GitHub URLs and cap at 40 chars.
+pub fn shorten_thread_name(prompt: &str) -> String {
+ use std::sync::LazyLock;
+ static GH_RE: LazyLock = LazyLock::new(|| {
+ regex::Regex::new(r"https?://github\.com/([^/]+/[^/]+)/(issues|pull)/(\d+)").unwrap()
+ });
+ // Strip @(role) and @(user) placeholders left by resolve_mentions()
+ let cleaned = prompt.replace("@(role)", "").replace("@(user)", "");
+ let shortened = GH_RE.replace_all(cleaned.trim(), "$1#$3");
+ let name: String = shortened.chars().take(40).collect();
+ if name.len() < shortened.len() {
+ format!("{name}...")
+ } else {
+ name
+ }
+}
+
diff --git a/src/main.rs b/src/main.rs
index 59330ab..52b986d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,22 +1,26 @@
mod acp;
+mod adapter;
mod config;
mod discord;
mod error_display;
mod format;
+mod media;
mod reactions;
mod setup;
+mod slack;
mod stt;
+use adapter::AdapterRouter;
use clap::Parser;
use serenity::prelude::*;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
-use tracing::info;
+use tracing::{error, info};
#[derive(Parser)]
#[command(name = "openab")]
-#[command(about = "Discord bot that manages ACP agent sessions", long_about = None)]
+#[command(about = "Multi-platform ACP agent broker (Discord, Slack)", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Option,
@@ -51,7 +55,7 @@ async fn main() -> anyhow::Result<()> {
match cmd {
Commands::Setup { output } => {
setup::run_setup(output.map(PathBuf::from))?;
- return Ok(());
+ Ok(())
}
Commands::Run { config } => {
let config_path = config
@@ -62,22 +66,20 @@ async fn main() -> anyhow::Result<()> {
info!(
agent_cmd = %cfg.agent.command,
pool_max = cfg.pool.max_sessions,
- channels = ?cfg.discord.allowed_channels,
- users = ?cfg.discord.allowed_users,
+ discord = cfg.discord.is_some(),
+ slack = cfg.slack.is_some(),
reactions = cfg.reactions.enabled,
- allow_bot_messages = ?cfg.discord.allow_bot_messages,
"config loaded"
);
+ if cfg.discord.is_none() && cfg.slack.is_none() {
+ anyhow::bail!("no adapter configured β add [discord] and/or [slack] to config.toml");
+ }
+
let pool = Arc::new(acp::SessionPool::new(cfg.agent, cfg.pool.max_sessions));
let ttl_secs = cfg.pool.session_ttl_hours * 3600;
- let allowed_channels = parse_id_set(&cfg.discord.allowed_channels, "allowed_channels")?;
- let allowed_users = parse_id_set(&cfg.discord.allowed_users, "allowed_users")?;
- let trusted_bot_ids = parse_id_set(&cfg.discord.trusted_bot_ids, "trusted_bot_ids")?;
- info!(channels = allowed_channels.len(), users = allowed_users.len(), trusted_bots = ?trusted_bot_ids, "parsed allowlists");
-
- // Resolve STT config before constructing handler (auto-detect mutates cfg.stt)
+ // Resolve STT config (auto-detect GROQ_API_KEY from env)
if cfg.stt.enabled {
if cfg.stt.api_key.is_empty() && cfg.stt.base_url.contains("groq.com") {
if let Ok(key) = std::env::var("GROQ_API_KEY") {
@@ -93,23 +95,10 @@ async fn main() -> anyhow::Result<()> {
info!(model = %cfg.stt.model, base_url = %cfg.stt.base_url, "STT enabled");
}
- let handler = discord::Handler {
- pool: pool.clone(),
- allowed_channels,
- allowed_users,
- reactions_config: cfg.reactions,
- stt_config: cfg.stt.clone(),
- allow_bot_messages: cfg.discord.allow_bot_messages,
- trusted_bot_ids,
- };
-
- let intents = GatewayIntents::GUILD_MESSAGES
- | GatewayIntents::MESSAGE_CONTENT
- | GatewayIntents::GUILDS;
+ let router = Arc::new(AdapterRouter::new(pool.clone(), cfg.reactions));
- let mut client = Client::builder(&cfg.discord.bot_token, intents)
- .event_handler(handler)
- .await?;
+ // Shutdown signal for Slack adapter
+ let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
// Spawn cleanup task
let cleanup_pool = pool.clone();
@@ -120,20 +109,91 @@ async fn main() -> anyhow::Result<()> {
}
});
- // Run bot until SIGINT/SIGTERM
- let shard_manager = client.shard_manager.clone();
- let shutdown_pool = pool.clone();
- tokio::spawn(async move {
+ // Spawn Slack adapter (background task)
+ let slack_handle = if let Some(slack_cfg) = cfg.slack {
+ info!(
+ channels = slack_cfg.allowed_channels.len(),
+ users = slack_cfg.allowed_users.len(),
+ "starting slack adapter"
+ );
+ let router = router.clone();
+ let stt = cfg.stt.clone();
+ Some(tokio::spawn(async move {
+ if let Err(e) = slack::run_slack_adapter(
+ slack_cfg.bot_token,
+ slack_cfg.app_token,
+ slack_cfg.allowed_channels.into_iter().collect(),
+ slack_cfg.allowed_users.into_iter().collect(),
+ stt,
+ router,
+ shutdown_rx,
+ )
+ .await
+ {
+ error!("slack adapter error: {e}");
+ }
+ }))
+ } else {
+ None
+ };
+
+ // Run Discord adapter (foreground, blocking) or wait for ctrl_c
+ if let Some(discord_cfg) = cfg.discord {
+ let allowed_channels =
+ parse_id_set(&discord_cfg.allowed_channels, "discord.allowed_channels")?;
+ let allowed_users = parse_id_set(&discord_cfg.allowed_users, "discord.allowed_users")?;
+ let trusted_bot_ids = parse_id_set(&discord_cfg.trusted_bot_ids, "discord.trusted_bot_ids")?;
+ info!(
+ channels = allowed_channels.len(),
+ users = allowed_users.len(),
+ trusted_bots = trusted_bot_ids.len(),
+ allow_bot_messages = ?discord_cfg.allow_bot_messages,
+ "starting discord adapter"
+ );
+
+ let handler = discord::Handler {
+ router,
+ allowed_channels,
+ allowed_users,
+ stt_config: cfg.stt.clone(),
+ adapter: std::sync::OnceLock::new(),
+ allow_bot_messages: discord_cfg.allow_bot_messages,
+ trusted_bot_ids,
+ };
+
+ let intents = GatewayIntents::GUILD_MESSAGES
+ | GatewayIntents::MESSAGE_CONTENT
+ | GatewayIntents::GUILDS;
+
+ let mut client = Client::builder(&discord_cfg.bot_token, intents)
+ .event_handler(handler)
+ .await?;
+
+ // Graceful Discord shutdown on ctrl_c
+ let shard_manager = client.shard_manager.clone();
+ tokio::spawn(async move {
+ tokio::signal::ctrl_c().await.ok();
+ info!("shutdown signal received");
+ shard_manager.shutdown_all().await;
+ });
+
+ info!("discord bot running");
+ client.start().await?;
+ } else {
+ // No Discord β just wait for ctrl_c
+ info!("running without discord, press ctrl+c to stop");
tokio::signal::ctrl_c().await.ok();
info!("shutdown signal received");
- shard_manager.shutdown_all().await;
- });
-
- info!("starting discord bot");
- client.start().await?;
+ }
// Cleanup
cleanup_handle.abort();
+ // Signal Slack adapter to shut down gracefully
+ let _ = shutdown_tx.send(true);
+ if let Some(handle) = slack_handle {
+ let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
+ }
+ let shutdown_pool = pool;
shutdown_pool.shutdown().await;
info!("openab shut down");
Ok(())
@@ -153,7 +213,9 @@ fn parse_id_set(raw: &[String], label: &str) -> anyhow::Result> {
})
.collect();
if !raw.is_empty() && set.is_empty() {
- anyhow::bail!("all {label} entries failed to parse β refusing to start with an empty allowlist");
+ anyhow::bail!(
+ "all {label} entries failed to parse β refusing to start with an empty allowlist"
+ );
}
Ok(set)
}
diff --git a/src/media.rs b/src/media.rs
new file mode 100644
index 0000000..709f788
--- /dev/null
+++ b/src/media.rs
@@ -0,0 +1,266 @@
+use crate::acp::ContentBlock;
+use crate::config::SttConfig;
+use base64::engine::general_purpose::STANDARD as BASE64;
+use base64::Engine;
+use image::ImageReader;
+use std::io::Cursor;
+use std::sync::LazyLock;
+use tracing::{debug, error};
+
+/// Reusable HTTP client for downloading attachments (shared across adapters).
+pub static HTTP_CLIENT: LazyLock = LazyLock::new(|| {
+ reqwest::Client::builder()
+ .timeout(std::time::Duration::from_secs(30))
+ .build()
+ .expect("static HTTP client must build")
+});
+
+/// Maximum dimension (width or height) for resized images.
+const IMAGE_MAX_DIMENSION_PX: u32 = 1200;
+
+/// JPEG quality for compressed output.
+const IMAGE_JPEG_QUALITY: u8 = 75;
+
+/// Download an image from a URL, resize/compress it, and return as a ContentBlock.
+/// Pass `auth_token` for platforms that require authentication (e.g. Slack private files).
+pub async fn download_and_encode_image(
+ url: &str,
+ mime_hint: Option<&str>,
+ filename: &str,
+ size: u64,
+ auth_token: Option<&str>,
+) -> Option {
+ const MAX_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
+
+ if url.is_empty() {
+ return None;
+ }
+
+ let mime = mime_hint.or_else(|| {
+ filename
+ .rsplit('.')
+ .next()
+ .and_then(|ext| match ext.to_lowercase().as_str() {
+ "png" => Some("image/png"),
+ "jpg" | "jpeg" => Some("image/jpeg"),
+ "gif" => Some("image/gif"),
+ "webp" => Some("image/webp"),
+ _ => None,
+ })
+ });
+
+ let Some(mime) = mime else {
+ debug!(filename, "skipping non-image attachment");
+ return None;
+ };
+ let mime = mime.split(';').next().unwrap_or(mime).trim();
+ if !mime.starts_with("image/") {
+ debug!(filename, mime, "skipping non-image attachment");
+ return None;
+ }
+
+ if size > MAX_SIZE {
+ error!(filename, size, "image exceeds 10MB limit");
+ return None;
+ }
+
+ let mut req = HTTP_CLIENT.get(url);
+ if let Some(token) = auth_token {
+ req = req.header("Authorization", format!("Bearer {token}"));
+ }
+
+ let response = match req.send().await {
+ Ok(resp) => resp,
+ Err(e) => { error!(url, error = %e, "download failed"); return None; }
+ };
+ if !response.status().is_success() {
+ error!(url, status = %response.status(), "HTTP error downloading image");
+ return None;
+ }
+ let bytes = match response.bytes().await {
+ Ok(b) => b,
+ Err(e) => { error!(url, error = %e, "read failed"); return None; }
+ };
+
+ if bytes.len() as u64 > MAX_SIZE {
+ error!(filename, size = bytes.len(), "downloaded image exceeds limit");
+ return None;
+ }
+
+ let (output_bytes, output_mime) = match resize_and_compress(&bytes) {
+ Ok(result) => result,
+ Err(e) => {
+ if bytes.len() > 1024 * 1024 {
+ error!(filename, error = %e, size = bytes.len(), "resize failed and original too large, skipping");
+ return None;
+ }
+ debug!(filename, error = %e, "resize failed, using original");
+ (bytes.to_vec(), mime.to_string())
+ }
+ };
+
+ debug!(
+ filename,
+ original_size = bytes.len(),
+ compressed_size = output_bytes.len(),
+ "image processed"
+ );
+
+ let encoded = BASE64.encode(&output_bytes);
+ Some(ContentBlock::Image {
+ media_type: output_mime,
+ data: encoded,
+ })
+}
+
+/// Download an audio file and transcribe it via the configured STT provider.
+/// Pass `auth_token` for platforms that require authentication.
+pub async fn download_and_transcribe(
+ url: &str,
+ filename: &str,
+ mime_type: &str,
+ size: u64,
+ stt_config: &SttConfig,
+ auth_token: Option<&str>,
+) -> Option {
+ const MAX_SIZE: u64 = 25 * 1024 * 1024; // 25 MB (Whisper API limit)
+
+ if size > MAX_SIZE {
+ error!(filename, size, "audio exceeds 25MB limit");
+ return None;
+ }
+
+ let mut req = HTTP_CLIENT.get(url);
+ if let Some(token) = auth_token {
+ req = req.header("Authorization", format!("Bearer {token}"));
+ }
+
+ let resp = req.send().await.ok()?;
+ if !resp.status().is_success() {
+ error!(url, status = %resp.status(), "audio download failed");
+ return None;
+ }
+ let bytes = resp.bytes().await.ok()?.to_vec();
+
+ crate::stt::transcribe(&HTTP_CLIENT, stt_config, bytes, filename.to_string(), mime_type).await
+}
+
+/// Resize image so longest side <= IMAGE_MAX_DIMENSION_PX, then encode as JPEG.
+/// GIFs are passed through unchanged to preserve animation.
+pub fn resize_and_compress(raw: &[u8]) -> Result<(Vec, String), image::ImageError> {
+ let reader = ImageReader::new(Cursor::new(raw))
+ .with_guessed_format()?;
+
+ let format = reader.format();
+
+ if format == Some(image::ImageFormat::Gif) {
+ return Ok((raw.to_vec(), "image/gif".to_string()));
+ }
+
+ let img = reader.decode()?;
+ let (w, h) = (img.width(), img.height());
+
+ let img = if w > IMAGE_MAX_DIMENSION_PX || h > IMAGE_MAX_DIMENSION_PX {
+ let max_side = std::cmp::max(w, h);
+ let ratio = f64::from(IMAGE_MAX_DIMENSION_PX) / f64::from(max_side);
+ let new_w = (f64::from(w) * ratio) as u32;
+ let new_h = (f64::from(h) * ratio) as u32;
+ img.resize(new_w, new_h, image::imageops::FilterType::Lanczos3)
+ } else {
+ img
+ };
+
+ let mut buf = Cursor::new(Vec::new());
+ let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, IMAGE_JPEG_QUALITY);
+ img.write_with_encoder(encoder)?;
+
+ Ok((buf.into_inner(), "image/jpeg".to_string()))
+}
+
+/// Check if a MIME type is audio.
+pub fn is_audio_mime(mime: &str) -> bool {
+ mime.starts_with("audio/")
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn make_png(width: u32, height: u32) -> Vec {
+ let img = image::RgbImage::new(width, height);
+ let mut buf = Cursor::new(Vec::new());
+ img.write_to(&mut buf, image::ImageFormat::Png).unwrap();
+ buf.into_inner()
+ }
+
+ #[test]
+ fn large_image_resized_to_max_dimension() {
+ let png = make_png(3000, 2000);
+ let (compressed, mime) = resize_and_compress(&png).unwrap();
+
+ assert_eq!(mime, "image/jpeg");
+ let result = image::load_from_memory(&compressed).unwrap();
+ assert!(result.width() <= IMAGE_MAX_DIMENSION_PX);
+ assert!(result.height() <= IMAGE_MAX_DIMENSION_PX);
+ }
+
+ #[test]
+ fn small_image_keeps_original_dimensions() {
+ let png = make_png(800, 600);
+ let (compressed, mime) = resize_and_compress(&png).unwrap();
+
+ assert_eq!(mime, "image/jpeg");
+ let result = image::load_from_memory(&compressed).unwrap();
+ assert_eq!(result.width(), 800);
+ assert_eq!(result.height(), 600);
+ }
+
+ #[test]
+ fn landscape_image_respects_aspect_ratio() {
+ let png = make_png(4000, 2000);
+ let (compressed, _) = resize_and_compress(&png).unwrap();
+
+ let result = image::load_from_memory(&compressed).unwrap();
+ assert_eq!(result.width(), 1200);
+ assert_eq!(result.height(), 600);
+ }
+
+ #[test]
+ fn portrait_image_respects_aspect_ratio() {
+ let png = make_png(2000, 4000);
+ let (compressed, _) = resize_and_compress(&png).unwrap();
+
+ let result = image::load_from_memory(&compressed).unwrap();
+ assert_eq!(result.width(), 600);
+ assert_eq!(result.height(), 1200);
+ }
+
+ #[test]
+ fn compressed_output_is_smaller_than_original() {
+ let png = make_png(3000, 2000);
+ let (compressed, _) = resize_and_compress(&png).unwrap();
+
+ assert!(compressed.len() < png.len(), "compressed {} should be < original {}", compressed.len(), png.len());
+ }
+
+ #[test]
+ fn gif_passes_through_unchanged() {
+ let gif: Vec = vec![
+ 0x47, 0x49, 0x46, 0x38, 0x39, 0x61,
+ 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
+ 0x2C, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00,
+ 0x02, 0x02, 0x44, 0x01, 0x00,
+ 0x3B,
+ ];
+ let (output, mime) = resize_and_compress(&gif).unwrap();
+
+ assert_eq!(mime, "image/gif");
+ assert_eq!(output, gif);
+ }
+
+ #[test]
+ fn invalid_data_returns_error() {
+ let garbage = vec![0x00, 0x01, 0x02, 0x03];
+ assert!(resize_and_compress(&garbage).is_err());
+ }
+}
diff --git a/src/reactions.rs b/src/reactions.rs
index 683d334..8638d86 100644
--- a/src/reactions.rs
+++ b/src/reactions.rs
@@ -1,7 +1,5 @@
+use crate::adapter::{ChatAdapter, MessageRef};
use crate::config::{ReactionEmojis, ReactionTiming};
-use serenity::http::Http;
-use serenity::model::channel::ReactionType;
-use serenity::model::id::{ChannelId, MessageId};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::Duration;
@@ -21,9 +19,8 @@ fn classify_tool<'a>(name: &str, emojis: &'a ReactionEmojis) -> &'a str {
}
struct Inner {
- http: Arc,
- channel: ChannelId,
- message: MessageId,
+ adapter: Arc,
+ message: MessageRef,
emojis: ReactionEmojis,
timing: ReactionTiming,
current: String,
@@ -41,16 +38,14 @@ pub struct StatusReactionController {
impl StatusReactionController {
pub fn new(
enabled: bool,
- http: Arc,
- channel: ChannelId,
- message: MessageId,
+ adapter: Arc,
+ message: MessageRef,
emojis: ReactionEmojis,
timing: ReactionTiming,
) -> Self {
Self {
inner: Arc::new(Mutex::new(Inner {
- http,
- channel,
+ adapter,
message,
emojis,
timing,
@@ -93,7 +88,7 @@ impl StatusReactionController {
let faces = ["π", "π", "π«‘", "π€", "π", "βοΈ", "πͺ", "π¦Ύ"];
let face = faces[rand::random::() % faces.len()];
let inner = self.inner.lock().await;
- let _ = add_reaction(&inner.http, inner.channel, inner.message, face).await;
+ let _ = inner.adapter.add_reaction(&inner.message, face).await;
}
pub async fn set_error(&self) {
@@ -108,7 +103,7 @@ impl StatusReactionController {
cancel_timers(&mut inner);
let current = inner.current.clone();
if !current.is_empty() {
- let _ = remove_reaction(&inner.http, inner.channel, inner.message, ¤t).await;
+ let _ = inner.adapter.remove_reaction(&inner.message, ¤t).await;
inner.current.clear();
}
}
@@ -121,15 +116,14 @@ impl StatusReactionController {
cancel_debounce(&mut inner);
let old = inner.current.clone();
inner.current = emoji.to_string();
- let http = inner.http.clone();
- let ch = inner.channel;
- let msg = inner.message;
+ let adapter = inner.adapter.clone();
+ let msg = inner.message.clone();
let new = emoji.to_string();
drop(inner);
- let _ = add_reaction(&http, ch, msg, &new).await;
+ let _ = adapter.add_reaction(&msg, &new).await;
if !old.is_empty() && old != new {
- let _ = remove_reaction(&http, ch, msg, &old).await;
+ let _ = adapter.remove_reaction(&msg, &old).await;
}
self.reset_stall_timers().await;
}
@@ -151,14 +145,13 @@ impl StatusReactionController {
if inner.finished { return; }
let old = inner.current.clone();
inner.current = emoji.clone();
- let http = inner.http.clone();
- let ch = inner.channel;
- let msg = inner.message;
+ let adapter = inner.adapter.clone();
+ let msg = inner.message.clone();
drop(inner);
- let _ = add_reaction(&http, ch, msg, &emoji).await;
+ let _ = adapter.add_reaction(&msg, &emoji).await;
if !old.is_empty() && old != emoji {
- let _ = remove_reaction(&http, ch, msg, &old).await;
+ let _ = adapter.remove_reaction(&msg, &old).await;
}
}));
self.reset_stall_timers_inner(&mut inner);
@@ -172,15 +165,14 @@ impl StatusReactionController {
let old = inner.current.clone();
inner.current = emoji.to_string();
- let http = inner.http.clone();
- let ch = inner.channel;
- let msg = inner.message;
+ let adapter = inner.adapter.clone();
+ let msg = inner.message.clone();
let new = emoji.to_string();
drop(inner);
- let _ = add_reaction(&http, ch, msg, &new).await;
+ let _ = adapter.add_reaction(&msg, &new).await;
if !old.is_empty() && old != new {
- let _ = remove_reaction(&http, ch, msg, &old).await;
+ let _ = adapter.remove_reaction(&msg, &old).await;
}
}
@@ -205,13 +197,12 @@ impl StatusReactionController {
if inner.finished { return; }
let old = inner.current.clone();
inner.current = "π₯±".to_string();
- let http = inner.http.clone();
- let ch = inner.channel;
- let msg = inner.message;
+ let adapter = inner.adapter.clone();
+ let msg = inner.message.clone();
drop(inner);
- let _ = add_reaction(&http, ch, msg, "π₯±").await;
+ let _ = adapter.add_reaction(&msg, "π₯±").await;
if !old.is_empty() && old != "π₯±" {
- let _ = remove_reaction(&http, ch, msg, &old).await;
+ let _ = adapter.remove_reaction(&msg, &old).await;
}
}
}));
@@ -222,13 +213,12 @@ impl StatusReactionController {
if inner.finished { return; }
let old = inner.current.clone();
inner.current = "π¨".to_string();
- let http = inner.http.clone();
- let ch = inner.channel;
- let msg = inner.message;
+ let adapter = inner.adapter.clone();
+ let msg = inner.message.clone();
drop(inner);
- let _ = add_reaction(&http, ch, msg, "π¨").await;
+ let _ = adapter.add_reaction(&msg, "π¨").await;
if !old.is_empty() && old != "π¨" {
- let _ = remove_reaction(&http, ch, msg, &old).await;
+ let _ = adapter.remove_reaction(&msg, &old).await;
}
}));
}
@@ -243,13 +233,3 @@ fn cancel_timers(inner: &mut Inner) {
if let Some(h) = inner.stall_soft_handle.take() { h.abort(); }
if let Some(h) = inner.stall_hard_handle.take() { h.abort(); }
}
-
-async fn add_reaction(http: &Http, ch: ChannelId, msg: MessageId, emoji: &str) -> serenity::Result<()> {
- let reaction = ReactionType::Unicode(emoji.to_string());
- http.create_reaction(ch, msg, &reaction).await
-}
-
-async fn remove_reaction(http: &Http, ch: ChannelId, msg: MessageId, emoji: &str) -> serenity::Result<()> {
- let reaction = ReactionType::Unicode(emoji.to_string());
- http.delete_reaction_me(ch, msg, &reaction).await
-}
diff --git a/src/slack.rs b/src/slack.rs
new file mode 100644
index 0000000..9779e6d
--- /dev/null
+++ b/src/slack.rs
@@ -0,0 +1,659 @@
+use crate::acp::ContentBlock;
+use crate::adapter::{AdapterRouter, ChatAdapter, ChannelRef, MessageRef, SenderContext};
+use crate::config::SttConfig;
+use crate::media;
+use anyhow::{anyhow, Result};
+use async_trait::async_trait;
+use futures_util::{SinkExt, StreamExt};
+use std::collections::{HashMap, HashSet};
+use std::sync::{Arc, LazyLock};
+use tokio::sync::watch;
+use tokio_tungstenite::tungstenite;
+use tracing::{debug, error, info, warn};
+
+const SLACK_API: &str = "https://slack.com/api";
+
+/// Map Unicode emoji to Slack short names for reactions API.
+/// Only covers the default `[reactions.emojis]` set. Custom emoji configured
+/// outside this map will fall back to `grey_question`.
+fn unicode_to_slack_emoji(unicode: &str) -> &str {
+ match unicode {
+ "π" => "eyes",
+ "π€" => "thinking_face",
+ "π₯" => "fire",
+ "π¨\u{200d}π»" => "technologist",
+ "β‘" => "zap",
+ "π" => "ok",
+ "π±" => "scream",
+ "π«" => "no_entry_sign",
+ "π" => "blush",
+ "π" => "sunglasses",
+ "π«‘" => "saluting_face",
+ "π€" => "nerd_face",
+ "π" => "smirk",
+ "β\u{fe0f}" => "v",
+ "πͺ" => "muscle",
+ "π¦Ύ" => "mechanical_arm",
+ "π₯±" => "yawning_face",
+ "π¨" => "fearful",
+ "β
" => "white_check_mark",
+ "β" => "x",
+ "π§" => "wrench",
+ _ => "grey_question",
+ }
+}
+
+// --- SlackAdapter: implements ChatAdapter for Slack ---
+
+/// TTL for cached user display names (5 minutes).
+const USER_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(300);
+
+pub struct SlackAdapter {
+ client: reqwest::Client,
+ bot_token: String,
+ bot_user_id: tokio::sync::OnceCell,
+ user_cache: tokio::sync::Mutex>,
+}
+
+impl SlackAdapter {
+ pub fn new(bot_token: String) -> Self {
+ Self {
+ client: reqwest::Client::new(),
+ bot_token,
+ bot_user_id: tokio::sync::OnceCell::new(),
+ user_cache: tokio::sync::Mutex::new(HashMap::new()),
+ }
+ }
+
+ /// Get the bot's own Slack user ID (cached after first call).
+ async fn get_bot_user_id(&self) -> Option<&str> {
+ self.bot_user_id.get_or_try_init(|| async {
+ let resp = self.api_post("auth.test", serde_json::json!({})).await
+ .map_err(|e| anyhow!("auth.test failed: {e}"))?;
+ resp["user_id"]
+ .as_str()
+ .map(|s| s.to_string())
+ .ok_or_else(|| anyhow!("no user_id in auth.test response"))
+ }).await.ok().map(|s| s.as_str())
+ }
+
+ async fn api_post(&self, method: &str, body: serde_json::Value) -> Result {
+ let resp = self
+ .client
+ .post(format!("{SLACK_API}/{method}"))
+ .header("Authorization", format!("Bearer {}", self.bot_token))
+ .header("Content-Type", "application/json; charset=utf-8")
+ .json(&body)
+ .send()
+ .await?;
+
+ let json: serde_json::Value = resp.json().await?;
+ if json["ok"].as_bool() != Some(true) {
+ let err = json["error"].as_str().unwrap_or("unknown error");
+ return Err(anyhow!("Slack API {method}: {err}"));
+ }
+ Ok(json)
+ }
+
+ /// Resolve a Slack user ID to display name via users.info API.
+ /// Results are cached for 5 minutes to avoid hitting Slack rate limits.
+ async fn resolve_user_name(&self, user_id: &str) -> Option {
+ // Check cache first
+ {
+ let cache = self.user_cache.lock().await;
+ if let Some((name, ts)) = cache.get(user_id) {
+ if ts.elapsed() < USER_CACHE_TTL {
+ return Some(name.clone());
+ }
+ }
+ }
+
+ let resp = self
+ .api_post(
+ "users.info",
+ serde_json::json!({ "user": user_id }),
+ )
+ .await
+ .ok()?;
+ let user = resp.get("user")?;
+ let profile = user.get("profile")?;
+ let display = profile
+ .get("display_name")
+ .and_then(|v| v.as_str())
+ .filter(|s| !s.is_empty());
+ let real = profile
+ .get("real_name")
+ .and_then(|v| v.as_str())
+ .filter(|s| !s.is_empty());
+ let name = user
+ .get("name")
+ .and_then(|v| v.as_str());
+ let resolved = display.or(real).or(name)?.to_string();
+
+ // Cache the result
+ self.user_cache.lock().await.insert(
+ user_id.to_string(),
+ (resolved.clone(), tokio::time::Instant::now()),
+ );
+
+ Some(resolved)
+ }
+}
+
+#[async_trait]
+impl ChatAdapter for SlackAdapter {
+ fn platform(&self) -> &'static str {
+ "slack"
+ }
+
+ fn message_limit(&self) -> usize {
+ 4000
+ }
+
+ async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result {
+ let mrkdwn = markdown_to_mrkdwn(content);
+ let mut body = serde_json::json!({
+ "channel": channel.channel_id,
+ "text": mrkdwn,
+ });
+ if let Some(thread_ts) = &channel.thread_id {
+ body["thread_ts"] = serde_json::Value::String(thread_ts.clone());
+ }
+ let resp = self.api_post("chat.postMessage", body).await?;
+ let ts = resp["ts"]
+ .as_str()
+ .ok_or_else(|| anyhow!("no ts in chat.postMessage response"))?;
+ Ok(MessageRef {
+ channel: ChannelRef {
+ platform: "slack".into(),
+ channel_id: channel.channel_id.clone(),
+ thread_id: channel.thread_id.clone(),
+ parent_id: None,
+ },
+ message_id: ts.to_string(),
+ })
+ }
+
+ async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()> {
+ let mrkdwn = markdown_to_mrkdwn(content);
+ self.api_post(
+ "chat.update",
+ serde_json::json!({
+ "channel": msg.channel.channel_id,
+ "ts": msg.message_id,
+ "text": mrkdwn,
+ }),
+ )
+ .await?;
+ Ok(())
+ }
+
+ async fn create_thread(
+ &self,
+ channel: &ChannelRef,
+ trigger_msg: &MessageRef,
+ _title: &str,
+ ) -> Result {
+ // Slack threads are implicit β posting with thread_ts creates/continues a thread.
+ Ok(ChannelRef {
+ platform: "slack".into(),
+ channel_id: channel.channel_id.clone(),
+ thread_id: Some(trigger_msg.message_id.clone()),
+ parent_id: None,
+ })
+ }
+
+ async fn add_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()> {
+ let name = unicode_to_slack_emoji(emoji);
+ match self.api_post(
+ "reactions.add",
+ serde_json::json!({
+ "channel": msg.channel.channel_id,
+ "timestamp": msg.message_id,
+ "name": name,
+ }),
+ )
+ .await
+ {
+ Ok(_) => Ok(()),
+ Err(e) if e.to_string().contains("already_reacted") => Ok(()),
+ Err(e) => Err(e),
+ }
+ }
+
+ async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()> {
+ let name = unicode_to_slack_emoji(emoji);
+ match self.api_post(
+ "reactions.remove",
+ serde_json::json!({
+ "channel": msg.channel.channel_id,
+ "timestamp": msg.message_id,
+ "name": name,
+ }),
+ )
+ .await
+ {
+ Ok(_) => Ok(()),
+ Err(e) if e.to_string().contains("no_reaction") => Ok(()),
+ Err(e) => Err(e),
+ }
+ }
+}
+
+// --- Socket Mode event loop ---
+
+/// Run the Slack adapter using Socket Mode (persistent WebSocket, no public URL needed).
+/// Reconnects automatically on disconnect.
+pub async fn run_slack_adapter(
+ bot_token: String,
+ app_token: String,
+ allowed_channels: HashSet,
+ allowed_users: HashSet,
+ stt_config: SttConfig,
+ router: Arc,
+ mut shutdown_rx: watch::Receiver,
+) -> Result<()> {
+ let adapter = Arc::new(SlackAdapter::new(bot_token.clone()));
+
+ loop {
+ // Check for shutdown before (re)connecting
+ if *shutdown_rx.borrow() {
+ info!("Slack adapter shutting down");
+ return Ok(());
+ }
+
+ let ws_url = match get_socket_mode_url(&app_token).await {
+ Ok(url) => url,
+ Err(e) => {
+ error!("failed to get Socket Mode URL: {e}");
+ tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+ continue;
+ }
+ };
+ info!(url = %ws_url, "connecting to Slack Socket Mode");
+
+ match tokio_tungstenite::connect_async(&ws_url).await {
+ Ok((ws_stream, _)) => {
+ info!("Slack Socket Mode connected");
+ let (mut write, mut read) = ws_stream.split();
+
+ loop {
+ tokio::select! {
+ msg_result = read.next() => {
+ let Some(msg_result) = msg_result else { break };
+ match msg_result {
+ Ok(tungstenite::Message::Text(text)) => {
+ let envelope: serde_json::Value =
+ match serde_json::from_str(&text) {
+ Ok(v) => v,
+ Err(_) => continue,
+ };
+
+ // Acknowledge the envelope immediately
+ if let Some(envelope_id) = envelope["envelope_id"].as_str() {
+ let ack = serde_json::json!({"envelope_id": envelope_id});
+ let _ = write
+ .send(tungstenite::Message::Text(ack.to_string()))
+ .await;
+ }
+
+ // Route events
+ if envelope["type"].as_str() == Some("events_api") {
+ let event = &envelope["payload"]["event"];
+ let event_type = event["type"].as_str().unwrap_or("");
+ match event_type {
+ "app_mention" => {
+ let event = event.clone();
+ let adapter = adapter.clone();
+ let bot_token = bot_token.clone();
+ let allowed_channels = allowed_channels.clone();
+ let allowed_users = allowed_users.clone();
+ let stt_config = stt_config.clone();
+ let router = router.clone();
+ tokio::spawn(async move {
+ handle_message(
+ &event,
+ true,
+ &adapter,
+ &bot_token,
+ &allowed_channels,
+ &allowed_users,
+ &stt_config,
+ &router,
+ )
+ .await;
+ });
+ }
+ "message" => {
+ // Handle thread follow-ups without @mention.
+ // Skip bot messages and subtypes that aren't real user messages.
+ let has_thread = event["thread_ts"].is_string();
+ let is_bot = event["bot_id"].is_string()
+ || event["subtype"].as_str() == Some("bot_message");
+ let subtype = event["subtype"].as_str().unwrap_or("");
+ let has_files = event["files"].is_array();
+ // Skip messages that @mention the bot β app_mention handles those
+ let msg_text = event["text"].as_str().unwrap_or("");
+ let mentions_bot = if let Some(bot_id) = adapter.get_bot_user_id().await {
+ msg_text.contains(&format!("<@{bot_id}>"))
+ } else {
+ false
+ };
+ debug!(
+ has_thread,
+ is_bot,
+ subtype,
+ has_files,
+ mentions_bot,
+ text = msg_text,
+ "message event received"
+ );
+ let skip_subtype = matches!(subtype,
+ "message_changed" | "message_deleted" |
+ "channel_join" | "channel_leave" |
+ "channel_topic" | "channel_purpose"
+ );
+ if has_thread && !is_bot && !skip_subtype && !mentions_bot {
+ let event = event.clone();
+ let adapter = adapter.clone();
+ let bot_token = bot_token.clone();
+ let allowed_channels = allowed_channels.clone();
+ let allowed_users = allowed_users.clone();
+ let stt_config = stt_config.clone();
+ let router = router.clone();
+ tokio::spawn(async move {
+ handle_message(
+ &event,
+ false,
+ &adapter,
+ &bot_token,
+ &allowed_channels,
+ &allowed_users,
+ &stt_config,
+ &router,
+ )
+ .await;
+ });
+ }
+ }
+ _ => {}
+ }
+ }
+ }
+ Ok(tungstenite::Message::Ping(data)) => {
+ let _ = write.send(tungstenite::Message::Pong(data)).await;
+ }
+ Ok(tungstenite::Message::Close(_)) => {
+ warn!("Slack Socket Mode connection closed by server");
+ break;
+ }
+ Err(e) => {
+ error!("Socket Mode read error: {e}");
+ break;
+ }
+ _ => {}
+ }
+ }
+ _ = shutdown_rx.changed() => {
+ info!("Slack adapter received shutdown signal");
+ let _ = write.send(tungstenite::Message::Close(None)).await;
+ return Ok(());
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("failed to connect to Slack Socket Mode: {e}");
+ }
+ }
+
+ warn!("reconnecting to Slack Socket Mode in 5s...");
+ tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+ }
+}
+
+/// Call apps.connections.open to get a WebSocket URL for Socket Mode.
+async fn get_socket_mode_url(app_token: &str) -> Result {
+ let client = reqwest::Client::new();
+ let resp = client
+ .post(format!("{SLACK_API}/apps.connections.open"))
+ .header("Authorization", format!("Bearer {app_token}"))
+ .header("Content-Type", "application/x-www-form-urlencoded")
+ .send()
+ .await?;
+ let json: serde_json::Value = resp.json().await?;
+ if json["ok"].as_bool() != Some(true) {
+ let err = json["error"].as_str().unwrap_or("unknown");
+ return Err(anyhow!("apps.connections.open: {err}"));
+ }
+ json["url"]
+ .as_str()
+ .map(|s| s.to_string())
+ .ok_or_else(|| anyhow!("no url in apps.connections.open response"))
+}
+
+#[allow(clippy::too_many_arguments)]
+async fn handle_message(
+ event: &serde_json::Value,
+ is_mention: bool,
+ adapter: &Arc,
+ bot_token: &str,
+ allowed_channels: &HashSet,
+ allowed_users: &HashSet,
+ stt_config: &SttConfig,
+ router: &Arc,
+) {
+ let channel_id = match event["channel"].as_str() {
+ Some(ch) => ch.to_string(),
+ None => return,
+ };
+ let user_id = match event["user"].as_str() {
+ Some(u) => u.to_string(),
+ None => return,
+ };
+ let text = match event["text"].as_str() {
+ Some(t) => t.to_string(),
+ None => return,
+ };
+ let ts = match event["ts"].as_str() {
+ Some(ts) => ts.to_string(),
+ None => return,
+ };
+ let thread_ts = event["thread_ts"].as_str().map(|s| s.to_string());
+
+ // Check allowed channels (empty = allow all)
+ if !allowed_channels.is_empty() && !allowed_channels.contains(&channel_id) {
+ return;
+ }
+
+ // Check allowed users
+ if !allowed_users.is_empty() && !allowed_users.contains(&user_id) {
+ tracing::info!(user_id, "denied Slack user, ignoring");
+ let msg_ref = MessageRef {
+ channel: ChannelRef {
+ platform: "slack".into(),
+ channel_id: channel_id.clone(),
+ thread_id: thread_ts.clone(),
+ parent_id: None,
+ },
+ message_id: ts.clone(),
+ };
+ let _ = adapter.add_reaction(&msg_ref, "π«").await;
+ return;
+ }
+
+ // Strip bot mention from text only for @mention events
+ let prompt = if is_mention {
+ strip_slack_mention(&text)
+ } else {
+ // Thread follow-up: check for bot loop before processing
+ if let Some(ref tts) = thread_ts {
+ if is_bot_loop(adapter, &channel_id, tts).await {
+ tracing::warn!(channel_id, thread_ts = tts, "bot loop detected, ignoring");
+ return;
+ }
+ }
+ text.trim().to_string()
+ };
+
+ // Process file attachments (images, audio)
+ let files = event["files"].as_array();
+ let has_files = files.is_some_and(|f| !f.is_empty());
+
+ if prompt.is_empty() && !has_files {
+ return;
+ }
+
+ let mut extra_blocks = Vec::new();
+ if let Some(files) = files {
+ for file in files {
+ let mimetype = file["mimetype"].as_str().unwrap_or("");
+ let filename = file["name"].as_str().unwrap_or("file");
+ let size = file["size"].as_u64().unwrap_or(0);
+ // Slack private files require Bearer token to download
+ let url = file["url_private_download"]
+ .as_str()
+ .or_else(|| file["url_private"].as_str())
+ .unwrap_or("");
+
+ if url.is_empty() {
+ continue;
+ }
+
+ if media::is_audio_mime(mimetype) {
+ if stt_config.enabled {
+ if let Some(transcript) = media::download_and_transcribe(
+ url,
+ filename,
+ mimetype,
+ size,
+ stt_config,
+ Some(bot_token),
+ ).await {
+ debug!(filename, chars = transcript.len(), "voice transcript injected");
+ extra_blocks.insert(0, ContentBlock::Text {
+ text: format!("[Voice message transcript]: {transcript}"),
+ });
+ }
+ } else {
+ debug!(filename, "skipping audio attachment (STT disabled)");
+ }
+ } else if let Some(block) = media::download_and_encode_image(
+ url,
+ Some(mimetype),
+ filename,
+ size,
+ Some(bot_token),
+ ).await {
+ debug!(filename, "adding image attachment");
+ extra_blocks.push(block);
+ }
+ }
+ }
+
+ // Resolve Slack display name (best-effort, fallback to user_id)
+ let display_name = adapter
+ .resolve_user_name(&user_id)
+ .await
+ .unwrap_or_else(|| user_id.clone());
+
+ let sender = SenderContext {
+ schema: "openab.sender.v1".into(),
+ sender_id: user_id.clone(),
+ sender_name: display_name.clone(),
+ display_name,
+ channel: "slack".into(),
+ channel_id: channel_id.clone(),
+ is_bot: false,
+ };
+
+ let trigger_msg = MessageRef {
+ channel: ChannelRef {
+ platform: "slack".into(),
+ channel_id: channel_id.clone(),
+ thread_id: thread_ts.clone(),
+ parent_id: None,
+ },
+ message_id: ts.clone(),
+ };
+
+ // Determine thread: if already in a thread, continue it; otherwise start a new thread
+ let thread_channel = ChannelRef {
+ platform: "slack".into(),
+ channel_id: channel_id.clone(),
+ thread_id: Some(thread_ts.unwrap_or(ts)),
+ parent_id: None,
+ };
+
+ let adapter_dyn: Arc = adapter.clone();
+ if let Err(e) = router
+ .handle_message(&adapter_dyn, &thread_channel, &sender, &prompt, extra_blocks, &trigger_msg)
+ .await
+ {
+ error!("Slack handle_message error: {e}");
+ }
+}
+
+static SLACK_MENTION_RE: LazyLock =
+ LazyLock::new(|| regex::Regex::new(r"<@[A-Z0-9]+>").unwrap());
+
+/// Hard cap on consecutive bot messages in a thread.
+/// Mirrors Discord's MAX_CONSECUTIVE_BOT_TURNS to prevent runaway loops.
+const MAX_CONSECUTIVE_BOT_TURNS: usize = 10;
+
+/// Check if the last N messages in a Slack thread are all from bots.
+async fn is_bot_loop(adapter: &SlackAdapter, channel: &str, thread_ts: &str) -> bool {
+ let resp = adapter
+ .api_post(
+ "conversations.replies",
+ serde_json::json!({
+ "channel": channel,
+ "ts": thread_ts,
+ "limit": MAX_CONSECUTIVE_BOT_TURNS + 1,
+ "inclusive": true,
+ }),
+ )
+ .await;
+
+ let Ok(json) = resp else { return false }; // fail-open on API error
+ let Some(messages) = json["messages"].as_array() else { return false };
+
+ // Skip the first message (thread parent), count consecutive bot messages from the end
+ let recent: Vec<_> = messages.iter().skip(1).rev().collect();
+ if recent.len() < MAX_CONSECUTIVE_BOT_TURNS {
+ return false;
+ }
+
+ recent
+ .iter()
+ .take(MAX_CONSECUTIVE_BOT_TURNS)
+ .all(|m| m["bot_id"].is_string() || m["subtype"].as_str() == Some("bot_message"))
+}
+
+fn strip_slack_mention(text: &str) -> String {
+ SLACK_MENTION_RE.replace_all(text, "").trim().to_string()
+}
+
+/// Convert Markdown (as output by Claude Code) to Slack mrkdwn format.
+fn markdown_to_mrkdwn(text: &str) -> String {
+ static BOLD_RE: LazyLock =
+ LazyLock::new(|| regex::Regex::new(r"\*\*(.+?)\*\*").unwrap());
+ static ITALIC_RE: LazyLock =
+ LazyLock::new(|| regex::Regex::new(r"\*([^*]+?)\*").unwrap());
+ static LINK_RE: LazyLock =
+ LazyLock::new(|| regex::Regex::new(r"\[([^\]]+)\]\(([^)]+)\)").unwrap());
+ static HEADING_RE: LazyLock =
+ LazyLock::new(|| regex::Regex::new(r"(?m)^#{1,6}\s+(.+)$").unwrap());
+ static CODE_BLOCK_LANG_RE: LazyLock =
+ LazyLock::new(|| regex::Regex::new(r"```\w+\n").unwrap());
+
+ // Order: bold first (** β placeholder), then italic (* β _), then restore bold
+ let text = BOLD_RE.replace_all(text, "\x01$1\x02"); // **bold** β \x01bold\x02
+ let text = ITALIC_RE.replace_all(&text, "_${1}_"); // *italic* β _italic_
+ // Restore bold: \x01bold\x02 β *bold*
+ let text = text.replace(['\x01', '\x02'], "*");
+ let text = LINK_RE.replace_all(&text, "<$2|$1>"); // [text](url) β
+ let text = HEADING_RE.replace_all(&text, "*$1*"); // # heading β *heading*
+ let text = CODE_BLOCK_LANG_RE.replace_all(&text, "```\n"); // ```rust β ```
+ text.into_owned()
+}