Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 133 additions & 9 deletions src/platform/telegram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ fn split_message(text: &str, max_len: usize) -> Vec<String> {
chunks
}

/// Build the static list of slash commands shown in Telegram's "/" menu.
///
/// The descriptions surface to the user via the BotFather command menu.
/// Routing for these commands lives in `handle_message`; this function only
/// publishes their existence to the Telegram client.
pub(crate) fn supported_commands() -> Vec<teloxide::types::BotCommand> {
use teloxide::types::BotCommand;
vec![
BotCommand::new("start", "Show the welcome message and command help"),
BotCommand::new("clear", "Clear the current conversation history"),
BotCommand::new("tools", "List available built-in and MCP tools"),
BotCommand::new("skills", "List loaded skills"),
BotCommand::new("verbose", "Toggle tool-call progress display"),
BotCommand::new("queryrewrite", "Toggle query rewriting for memory search"),
]
}

/// Run the Telegram bot platform
pub async fn run(
agent: Arc<Agent>,
Expand All @@ -53,6 +70,15 @@ pub async fn run(

info!("Starting Telegram platform...");

// Publish the slash-command menu to Telegram so clients show suggestions.
// Best-effort: a network failure here must not block the bot from running.
let commands = supported_commands();
let count = commands.len();
match bot.set_my_commands(commands).await {
Ok(_) => info!("Registered {} Telegram commands", count),
Err(e) => warn!(error = %e, "Failed to register Telegram commands"),
}

let handler = Update::filter_message()
.filter_map(move |msg: Message| {
let user = msg.from.as_ref()?;
Expand Down Expand Up @@ -122,7 +148,7 @@ async fn handle_message(bot: Bot, msg: Message, agent: Arc<Agent>) -> ResponseRe
/tools - List available tools\n\
/skills - List loaded skills\n\
/verbose - Toggle tool call progress display\n\
/query-rewrite - Toggle query rewriting for memory search",
/queryrewrite - Toggle query rewriting for memory search",
);
bot.send_message(msg.chat.id, help)
.parse_mode(ParseMode::MarkdownV2)
Expand Down Expand Up @@ -193,7 +219,10 @@ async fn handle_message(bot: Bot, msg: Message, agent: Arc<Agent>) -> ResponseRe
return Ok(());
}

if text == "/query-rewrite" {
// Accept both the canonical `/queryrewrite` (registered with Telegram —
// Bot API command names cannot contain hyphens) and the legacy
// `/query-rewrite` form for users with existing muscle memory.
if text == "/queryrewrite" || text == "/query-rewrite" {
let current = agent
.memory
.recall("settings", &format!("query_rewrite_enabled_{}", user_id))
Expand Down Expand Up @@ -267,8 +296,12 @@ async fn handle_message(bot: Bot, msg: Message, agent: Arc<Agent>) -> ResponseRe
};

// When verbose is OFF, send a transient "Thinking..." placeholder so the user
// knows the bot is processing. The stream handle will edit it in-place with the
// first LLM tokens, so only one message is ever visible.
// knows the bot is processing. The placeholder is **independent** of the
// streaming output — when the first token arrives it is delivered as a NEW
// message, and the placeholder is deleted by `handle_message` after the
// stream completes (success or error). This keeps the placeholder a
// standalone progress signal rather than a doomed attempt to morph into the
// final answer.
let placeholder_msg_id: Option<teloxide::types::MessageId> = if !verbose_enabled {
match bot.send_message(msg.chat.id, "⏳ Thinking...").await {
Ok(sent) => Some(sent.id),
Expand All @@ -293,8 +326,9 @@ async fn handle_message(bot: Bot, msg: Message, agent: Arc<Agent>) -> ResponseRe
use std::time::{Duration, Instant};

let mut buffer = String::new();
// Seed current_msg_id with the placeholder so the first edit reuses it.
let mut current_msg_id: Option<teloxide::types::MessageId> = placeholder_msg_id;
// The first token always starts a fresh message — the placeholder
// (if any) is owned and deleted by `handle_message` after streaming.
let mut current_msg_id: Option<teloxide::types::MessageId> = None;
let mut last_action = Instant::now();
let mut rx = stream_token_rx;

Expand Down Expand Up @@ -379,10 +413,10 @@ async fn handle_message(bot: Bot, msg: Message, agent: Arc<Agent>) -> ResponseRe
.ok();
}
}
} else if let Some(msg_id) = current_msg_id {
// Edge case: no tokens were streamed but we have a placeholder — delete it
stream_bot.delete_message(stream_chat_id, msg_id).await.ok();
}
// If `buffer` was empty and `current_msg_id` is None, nothing was
// streamed — the placeholder owned by `handle_message` will be cleaned
// up after this task completes.
});

// Build platform-agnostic message
Expand Down Expand Up @@ -415,6 +449,15 @@ async fn handle_message(bot: Bot, msg: Message, agent: Arc<Agent>) -> ResponseRe
// Wait for stream receiver to complete its final edit
stream_handle.await.ok();

// Delete the "Thinking..." placeholder now that the response (or error
// reply below) has been delivered. Best-effort: ignore failures so a
// stale placeholder never blocks reporting the actual outcome.
if let Some(placeholder_id) = placeholder_msg_id {
if let Err(e) = bot.delete_message(msg.chat.id, placeholder_id).await {
tracing::warn!(error = %e, "Failed to delete thinking placeholder");
}
}

if let Err(e) = process_result {
warn!(error = %e, "Agent processing failed");
bot.send_message(msg.chat.id, escape_text(&format!("Error: {:#}", e)))
Expand Down Expand Up @@ -509,4 +552,85 @@ mod tests {
"Zero-width-space placeholder must be removed from stream_handle"
);
}

#[test]
fn test_first_token_does_not_inherit_placeholder_msg_id() {
// The streaming task must seed `current_msg_id` to `None` so the first
// token is delivered as a NEW message rather than editing the
// "Thinking..." placeholder. Source-inspection guard against future
// refactors that re-introduce the seeding behavior.
//
// Construct the bad-pattern needle at runtime from pieces so the test
// body itself never contains the contiguous substring being searched
// for (otherwise the `contains` check would always trip on this very
// test's source).
let source = include_str!("telegram.rs");
let bad_needle = format!(
"current_msg_id: Option<teloxide::types::MessageId> = {}",
"placeholder_msg_id"
);
assert!(
!source.contains(&bad_needle),
"stream_handle must NOT seed current_msg_id with the placeholder id; first token must be a new message"
);
let good_needle = format!(
"let mut current_msg_id: Option<teloxide::types::MessageId> = {};",
"None"
);
assert!(
source.contains(&good_needle),
"stream_handle must initialize current_msg_id to None"
);
}

#[test]
fn test_placeholder_is_deleted_after_streaming() {
// The Thinking placeholder must be cleaned up in `handle_message` after
// `stream_handle.await`, regardless of success/error outcome.
let source = include_str!("telegram.rs");
assert!(
source.contains("Failed to delete thinking placeholder"),
"handle_message must delete the Thinking placeholder after streaming completes"
);
}

#[test]
fn test_supported_commands_lists_user_visible_commands() {
let cmds = supported_commands();
let names: Vec<&str> = cmds.iter().map(|c| c.command.as_str()).collect();
for required in &[
"start",
"clear",
"tools",
"skills",
"verbose",
"queryrewrite",
] {
assert!(
names.contains(required),
"supported_commands missing /{required}: got {names:?}"
);
}
// Telegram BotCommand names must match `[a-z0-9_]{1,32}`.
for c in &cmds {
assert!(
c.command
.chars()
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'),
"command '{}' contains invalid characters for Telegram BotCommand",
c.command
);
assert!(
(1..=32).contains(&c.command.len()),
"command '{}' has invalid length {}",
c.command,
c.command.len()
);
assert!(
!c.description.is_empty(),
"command '{}' is missing a description",
c.command
);
}
}
}
Loading