diff --git a/src/api.rs b/src/api.rs index 5e17ad7..fc5316a 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use futures_util::StreamExt; @@ -8,14 +8,13 @@ use serde_json::{json, Value}; use crate::error::NoteDeckError; use crate::models::{ - AuthResult, ChatMessage, CreateNoteParams, NormalizedDriveFile, NormalizedNote, - NormalizedNoteReaction, NormalizedNotification, NormalizedUser, NormalizedUserDetail, - RawCreateNoteResponse, RawDriveFile, RawEmojisResponse, RawMiAuthResponse, RawNote, - RawNoteReaction, RawNotification, Antenna, Channel, Clip, RawUser, RawUserDetail, + Antenna, AuthResult, Channel, ChatMessage, ChatUser, Clip, CreateNoteParams, + NormalizedDriveFile, NormalizedNote, NormalizedNoteReaction, NormalizedNotification, + NormalizedUser, NormalizedUserDetail, RawCreateNoteResponse, RawDriveFile, RawEmojisResponse, + RawMiAuthResponse, RawNote, RawNoteReaction, RawNotification, RawUser, RawUserDetail, SearchOptions, ServerEmoji, TimelineOptions, TimelineType, UserList, }; - /// Maximum response body size (50 MB) to prevent memory exhaustion from malicious servers. const MAX_RESPONSE_BYTES: usize = 50 * 1024 * 1024; @@ -94,7 +93,8 @@ impl MisskeyClient { }); } } - let mut buf = Vec::with_capacity(content_len.unwrap_or(4096).min(MAX_RESPONSE_BYTES as u64) as usize); + let mut buf = + Vec::with_capacity(content_len.unwrap_or(4096).min(MAX_RESPONSE_BYTES as u64) as usize); let mut stream = res.bytes_stream(); while let Some(chunk) = stream.next().await { let chunk = chunk.map_err(NoteDeckError::from)?; @@ -181,7 +181,11 @@ impl MisskeyClient { ) -> Result, NoteDeckError> { let endpoint = timeline_type.api_endpoint(); let mut params = json!({ "limit": options.limit() }); - apply_pagination(&mut params, options.since_id.as_deref(), options.until_id.as_deref()); + apply_pagination( + &mut params, + options.since_id.as_deref(), + options.until_id.as_deref(), + ); if let Some(ref f) = options.filters { if let Some(v) = f.with_renotes { params["withRenotes"] = json!(v); @@ -219,7 +223,9 @@ impl MisskeyClient { host: &str, token: &str, ) -> Result, NoteDeckError> { - let data = self.request(host, token, "users/lists/list", json!({})).await?; + let data = self + .request(host, token, "users/lists/list", json!({})) + .await?; let lists: Vec = serde_json::from_value(data)?; Ok(lists) } @@ -229,7 +235,9 @@ impl MisskeyClient { host: &str, token: &str, ) -> Result, NoteDeckError> { - let data = self.request(host, token, "antennas/list", json!({})).await?; + let data = self + .request(host, token, "antennas/list", json!({})) + .await?; let antennas: Vec = serde_json::from_value(data)?; Ok(antennas) } @@ -290,9 +298,7 @@ impl MisskeyClient { limit: i64, ) -> Result, NoteDeckError> { let params = json!({ "limit": limit }); - let data = self - .request(host, token, "notes/featured", params) - .await?; + let data = self.request(host, token, "notes/featured", params).await?; let raw: Vec = serde_json::from_value(data)?; Ok(raw .into_iter() @@ -300,11 +306,7 @@ impl MisskeyClient { .collect()) } - pub async fn get_clips( - &self, - host: &str, - token: &str, - ) -> Result, NoteDeckError> { + pub async fn get_clips(&self, host: &str, token: &str) -> Result, NoteDeckError> { let data = self.request(host, token, "clips/list", json!({})).await?; let clips: Vec = serde_json::from_value(data)?; Ok(clips) @@ -339,7 +341,12 @@ impl MisskeyClient { host: &str, token: &str, ) -> Result, NoteDeckError> { - let search_fut = self.request(host, token, "channels/search", json!({"query": "", "limit": 100})); + let search_fut = self.request( + host, + token, + "channels/search", + json!({"query": "", "limit": 100}), + ); let featured_fut = self.request(host, token, "channels/featured", json!({})); let (followed, favorites, owned, featured, search) = if token.is_empty() { @@ -360,7 +367,11 @@ impl MisskeyClient { let mut channels = Vec::with_capacity(128); // User's own channels first, then public channels - for data in [followed, favorites, owned, featured, search].into_iter().flatten().flatten() { + for data in [followed, favorites, owned, featured, search] + .into_iter() + .flatten() + .flatten() + { if let Ok(list) = serde_json::from_value::>(data) { for ch in list { if seen.insert(ch.id.clone()) { @@ -500,8 +511,7 @@ impl MisskeyClient { } let data = self.request(host, token, "notes/create", body).await?; - let raw: RawCreateNoteResponse = - serde_json::from_value(data)?; + let raw: RawCreateNoteResponse = serde_json::from_value(data)?; Ok(raw.created_note.normalize(account_id, host)) } @@ -597,6 +607,7 @@ impl MisskeyClient { Ok(()) } + #[allow(clippy::too_many_arguments)] pub async fn upload_file( &self, host: &str, @@ -682,13 +693,8 @@ impl MisskeyClient { token: &str, note_id: &str, ) -> Result<(), NoteDeckError> { - self.request( - host, - token, - "notes/delete", - json!({ "noteId": note_id }), - ) - .await?; + self.request(host, token, "notes/delete", json!({ "noteId": note_id })) + .await?; Ok(()) } @@ -758,7 +764,12 @@ impl MisskeyClient { let palettes = palettes_val.as_array()?; let first = palettes.first()?; let emojis = first.get("emojis")?.as_array()?; - Some(emojis.iter().filter_map(|v| v.as_str().map(String::from)).collect()) + Some( + emojis + .iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect(), + ) } pub async fn get_user_notes( @@ -770,7 +781,11 @@ impl MisskeyClient { options: TimelineOptions, ) -> Result, NoteDeckError> { let mut params = json!({ "userId": user_id, "limit": options.limit() }); - apply_pagination(&mut params, options.since_id.as_deref(), options.until_id.as_deref()); + apply_pagination( + &mut params, + options.since_id.as_deref(), + options.until_id.as_deref(), + ); let data = self.request(host, token, "users/notes", params).await?; let raw: Vec = serde_json::from_value(data)?; Ok(raw @@ -788,7 +803,11 @@ impl MisskeyClient { options: SearchOptions, ) -> Result, NoteDeckError> { let mut params = json!({ "query": query, "limit": options.limit() }); - apply_pagination(&mut params, options.since_id.as_deref(), options.until_id.as_deref()); + apply_pagination( + &mut params, + options.since_id.as_deref(), + options.until_id.as_deref(), + ); if let Some(d) = options.since_date { params["sinceDate"] = json!(d); } @@ -811,12 +830,13 @@ impl MisskeyClient { options: TimelineOptions, ) -> Result, NoteDeckError> { let mut params = json!({ "limit": options.limit() }); - apply_pagination(&mut params, options.since_id.as_deref(), options.until_id.as_deref()); - let data = self - .request(host, token, "i/notifications", params) - .await?; - let raw: Vec = - serde_json::from_value(data)?; + apply_pagination( + &mut params, + options.since_id.as_deref(), + options.until_id.as_deref(), + ); + let data = self.request(host, token, "i/notifications", params).await?; + let raw: Vec = serde_json::from_value(data)?; Ok(raw .into_iter() .map(|n| n.normalize(account_id, host)) @@ -831,7 +851,11 @@ impl MisskeyClient { options: TimelineOptions, ) -> Result, NoteDeckError> { let mut params = json!({ "limit": options.limit() }); - apply_pagination(&mut params, options.since_id.as_deref(), options.until_id.as_deref()); + apply_pagination( + &mut params, + options.since_id.as_deref(), + options.until_id.as_deref(), + ); let data = self .request(host, token, "i/notifications-grouped", params) .await?; @@ -1068,8 +1092,13 @@ impl MisskeyClient { token: &str, user_id: &str, ) -> Result<(), NoteDeckError> { - self.request(host, token, "following/create", json!({ "userId": user_id })) - .await?; + self.request( + host, + token, + "following/create", + json!({ "userId": user_id }), + ) + .await?; Ok(()) } @@ -1079,8 +1108,13 @@ impl MisskeyClient { token: &str, user_id: &str, ) -> Result<(), NoteDeckError> { - self.request(host, token, "following/delete", json!({ "userId": user_id })) - .await?; + self.request( + host, + token, + "following/delete", + json!({ "userId": user_id }), + ) + .await?; Ok(()) } @@ -1090,8 +1124,13 @@ impl MisskeyClient { token: &str, user_id: &str, ) -> Result<(), NoteDeckError> { - self.request(host, token, "following/invalidate", json!({ "userId": user_id })) - .await?; + self.request( + host, + token, + "following/invalidate", + json!({ "userId": user_id }), + ) + .await?; Ok(()) } @@ -1101,8 +1140,13 @@ impl MisskeyClient { token: &str, user_id: &str, ) -> Result<(), NoteDeckError> { - self.request(host, token, "following/requests/accept", json!({ "userId": user_id })) - .await?; + self.request( + host, + token, + "following/requests/accept", + json!({ "userId": user_id }), + ) + .await?; Ok(()) } @@ -1112,17 +1156,18 @@ impl MisskeyClient { token: &str, user_id: &str, ) -> Result<(), NoteDeckError> { - self.request(host, token, "following/requests/reject", json!({ "userId": user_id })) - .await?; + self.request( + host, + token, + "following/requests/reject", + json!({ "userId": user_id }), + ) + .await?; Ok(()) } /// Fetch server meta information. - pub async fn get_meta( - &self, - host: &str, - token: &str, - ) -> Result { + pub async fn get_meta(&self, host: &str, token: &str) -> Result { self.request(host, token, "meta", json!({})).await } @@ -1266,11 +1311,7 @@ impl MisskeyClient { // --- Unread chat --- - pub async fn get_unread_chat( - &self, - host: &str, - token: &str, - ) -> Result { + pub async fn get_unread_chat(&self, host: &str, token: &str) -> Result { let data = self .request(host, token, "messaging/unread", json!({})) .await?; @@ -1279,11 +1320,7 @@ impl MisskeyClient { // --- Self (current user) --- - pub async fn get_self( - &self, - host: &str, - token: &str, - ) -> Result { + pub async fn get_self(&self, host: &str, token: &str) -> Result { self.request(host, token, "i", json!({})).await } @@ -1327,8 +1364,13 @@ impl MisskeyClient { token: &str, file_id: &str, ) -> Result<(), NoteDeckError> { - self.request(host, token, "drive/files/delete", json!({ "fileId": file_id })) - .await?; + self.request( + host, + token, + "drive/files/delete", + json!({ "fileId": file_id }), + ) + .await?; Ok(()) } @@ -1340,8 +1382,13 @@ impl MisskeyClient { token: &str, limit: i64, ) -> Result { - self.request(host, token, "following/requests/list", json!({ "limit": limit })) - .await + self.request( + host, + token, + "following/requests/list", + json!({ "limit": limit }), + ) + .await } // --- Explore (users/roles) --- @@ -1371,11 +1418,7 @@ impl MisskeyClient { self.request(host, token, "users", params).await } - pub async fn get_roles( - &self, - host: &str, - token: &str, - ) -> Result { + pub async fn get_roles(&self, host: &str, token: &str) -> Result { self.request(host, token, "roles/list", json!({})).await } @@ -1539,19 +1582,11 @@ impl MisskeyClient { // --- Server stats --- - pub async fn get_server_stats( - &self, - host: &str, - token: &str, - ) -> Result { + pub async fn get_server_stats(&self, host: &str, token: &str) -> Result { self.request(host, token, "stats", json!({})).await } - pub async fn get_meta_detail( - &self, - host: &str, - token: &str, - ) -> Result { + pub async fn get_meta_detail(&self, host: &str, token: &str) -> Result { self.request(host, token, "meta", json!({ "detail": true })) .await } @@ -1747,9 +1782,7 @@ impl MisskeyClient { if room { params["room"] = json!(true); } - let data = self - .request(host, token, "chat/history", params) - .await?; + let data = self.request(host, token, "chat/history", params).await?; let messages: Vec = serde_json::from_value(data)?; Ok(messages) } @@ -1771,7 +1804,9 @@ impl MisskeyClient { let data = self .request(host, token, "chat/messages/user-timeline", params) .await?; - let messages: Vec = serde_json::from_value(data)?; + let mut messages: Vec = serde_json::from_value(data)?; + self.hydrate_chat_message_users(host, token, &mut messages) + .await; Ok(messages) } @@ -1792,10 +1827,83 @@ impl MisskeyClient { let data = self .request(host, token, "chat/messages/room-timeline", params) .await?; - let messages: Vec = serde_json::from_value(data)?; + let mut messages: Vec = serde_json::from_value(data)?; + self.hydrate_chat_message_users(host, token, &mut messages) + .await; Ok(messages) } + /// `users/show?userIds=[...]` の bulk 取得。 + /// Misskey は `userIds` 配列で 1 回の API 呼び出しで複数ユーザーを返す。 + /// 空配列なら API を叩かず即 `Ok(vec![])` で return。 + pub async fn get_users_bulk( + &self, + host: &str, + token: &str, + user_ids: &[String], + ) -> Result, NoteDeckError> { + if user_ids.is_empty() { + return Ok(vec![]); + } + let data = self + .request(host, token, "users/show", json!({ "userIds": user_ids })) + .await?; + let users: Vec = serde_json::from_value(data)?; + Ok(users) + } + + /// `chat/messages/{user|room}-timeline` と WS chat:message が + /// Misskey 本家の Lite packer 固定で `fromUser`/`toUser` を含まない + /// (#460) のを補完する。null になっている `from_user` / `to_user` を + /// `users/show?userIds=[...]` で 1 リクエストにまとめて hydrate する。 + /// 失敗しても null のまま継続する best-effort 動作。 + pub(crate) async fn hydrate_chat_message_users( + &self, + host: &str, + token: &str, + messages: &mut [ChatMessage], + ) { + let mut needed: HashSet = HashSet::new(); + for m in messages.iter() { + if m.from_user.is_none() { + needed.insert(m.from_user_id.clone()); + } + if let Some(uid) = &m.to_user_id { + if m.to_user.is_none() { + needed.insert(uid.clone()); + } + } + } + if needed.is_empty() { + return; + } + let user_ids: Vec = needed.into_iter().collect(); + let users = match self.get_users_bulk(host, token, &user_ids).await { + Ok(u) => u, + Err(e) => { + tracing::warn!(error = %e, "failed to hydrate chat users"); + return; + } + }; + let user_map: HashMap = + users.into_iter().map(|u| (u.id.clone(), u)).collect(); + for m in messages.iter_mut() { + if m.from_user.is_none() { + if let Some(u) = user_map.get(&m.from_user_id) { + m.from_user = Some(u.clone()); + } + } + let to_uid = m.to_user_id.clone(); + if let Some(uid) = to_uid { + if m.to_user.is_none() { + if let Some(u) = user_map.get(&uid) { + m.to_user = Some(u.clone()); + } + } + } + } + } + pub async fn create_chat_message_to_user( &self, host: &str, @@ -1992,8 +2100,13 @@ impl MisskeyClient { token: &str, user_id: &str, ) -> Result<(), NoteDeckError> { - self.request(host, token, "renote-mute/create", json!({ "userId": user_id })) - .await?; + self.request( + host, + token, + "renote-mute/create", + json!({ "userId": user_id }), + ) + .await?; Ok(()) } @@ -2003,8 +2116,13 @@ impl MisskeyClient { token: &str, user_id: &str, ) -> Result<(), NoteDeckError> { - self.request(host, token, "renote-mute/delete", json!({ "userId": user_id })) - .await?; + self.request( + host, + token, + "renote-mute/delete", + json!({ "userId": user_id }), + ) + .await?; Ok(()) } @@ -2183,7 +2301,6 @@ impl MisskeyClient { let meta: Value = serde_json::from_str(&text)?; Ok(meta) } - } #[cfg(test)] @@ -2252,10 +2369,9 @@ mod tests { let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/notes/show")) - .respond_with( - ResponseTemplate::new(404) - .set_body_json(json!({"error": {"message": "No such note", "code": "NO_SUCH_NOTE"}})), - ) + .respond_with(ResponseTemplate::new(404).set_body_json( + json!({"error": {"message": "No such note", "code": "NO_SUCH_NOTE"}}), + )) .mount(&server) .await; @@ -2309,13 +2425,10 @@ mod tests { let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/notes/timeline")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(json!([ - raw_note_json("n1", "Hello"), - raw_note_json("n2", "World"), - ])), - ) + .respond_with(ResponseTemplate::new(200).set_body_json(json!([ + raw_note_json("n1", "Hello"), + raw_note_json("n2", "World"), + ]))) .mount(&server) .await; @@ -2595,8 +2708,7 @@ mod tests { Mock::given(method("POST")) .and(path("/api/notes/children")) .respond_with( - ResponseTemplate::new(200) - .set_body_json(json!([raw_note_json("r1", "reply")])), + ResponseTemplate::new(200).set_body_json(json!([raw_note_json("r1", "reply")])), ) .mount(&server) .await; @@ -2620,7 +2732,11 @@ mod tests { .await; let client = MisskeyClient::with_base_url(&server.uri()); - let scope = vec!["client".to_string(), "preferences".to_string(), "sync".to_string()]; + let scope = vec![ + "client".to_string(), + "preferences".to_string(), + "sync".to_string(), + ]; let result = client .get_registry_value("h", "token", &scope, "theme:dark") .await @@ -2661,7 +2777,11 @@ mod tests { .await; let client = MisskeyClient::with_base_url(&server.uri()); - let scope = vec!["client".to_string(), "preferences".to_string(), "sync".to_string()]; + let scope = vec![ + "client".to_string(), + "preferences".to_string(), + "sync".to_string(), + ]; client .set_registry_value("h", "token", &scope, "theme:dark", json!("my-theme")) .await @@ -2694,14 +2814,19 @@ mod tests { let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/i/registry/keys-with-type")) - .respond_with(ResponseTemplate::new(200).set_body_json( - json!({"theme:dark": "string", "plugins": "array"}), - )) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(json!({"theme:dark": "string", "plugins": "array"})), + ) .mount(&server) .await; let client = MisskeyClient::with_base_url(&server.uri()); - let scope = vec!["client".to_string(), "preferences".to_string(), "sync".to_string()]; + let scope = vec![ + "client".to_string(), + "preferences".to_string(), + "sync".to_string(), + ]; let result = client .list_registry_keys("h", "token", &scope) .await @@ -2709,4 +2834,174 @@ mod tests { assert_eq!(result.get("theme:dark").map(String::as_str), Some("string")); assert_eq!(result.get("plugins").map(String::as_str), Some("array")); } + + // --- Chat user hydration (#460) --- + + fn null_user_chat_message_json(id: &str, from: &str, to: &str) -> Value { + json!({ + "id": id, + "createdAt": "2025-01-01T00:00:00.000Z", + "fromUserId": from, + "fromUser": null, + "toUserId": to, + "toUser": null, + "toRoomId": null, + "toRoom": null, + "text": "hi", + "fileId": null, + "file": null, + "isRead": false, + "reactions": [] + }) + } + + fn user_show_response_for(ids: &[(&str, &str)]) -> Value { + let arr: Vec = ids + .iter() + .map(|(id, name)| { + json!({ + "id": id, + "username": name, + "name": name, + "host": null, + "avatarUrl": format!("https://example.com/{name}.png"), + "emojis": {} + }) + }) + .collect(); + Value::Array(arr) + } + + #[tokio::test] + async fn get_users_bulk_returns_users() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/api/users/show")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(user_show_response_for(&[("u1", "alice"), ("u2", "bob")])), + ) + .mount(&server) + .await; + + let client = MisskeyClient::with_base_url(&server.uri()); + let users = client + .get_users_bulk("h", "token", &["u1".to_string(), "u2".to_string()]) + .await + .unwrap(); + assert_eq!(users.len(), 2); + assert_eq!(users[0].username, "alice"); + assert_eq!(users[1].username, "bob"); + } + + #[tokio::test] + async fn get_users_bulk_empty_input_returns_empty_without_request() { + let client = MisskeyClient::with_base_url("http://127.0.0.1:1"); + let users = client.get_users_bulk("h", "token", &[]).await.unwrap(); + assert!(users.is_empty()); + } + + #[tokio::test] + async fn get_chat_user_messages_hydrates_null_users() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/api/chat/messages/user-timeline")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!([ + null_user_chat_message_json("m1", "u_other", "u_self"), + null_user_chat_message_json("m2", "u_self", "u_other"), + ]))) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/api/users/show")) + .respond_with( + ResponseTemplate::new(200).set_body_json(user_show_response_for(&[ + ("u_self", "me"), + ("u_other", "they"), + ])), + ) + .expect(1) + .mount(&server) + .await; + + let client = MisskeyClient::with_base_url(&server.uri()); + let msgs = client + .get_chat_user_messages("h", "token", "u_other", 30, None, None) + .await + .unwrap(); + assert_eq!(msgs.len(), 2); + assert_eq!(msgs[0].from_user.as_ref().unwrap().username, "they"); + assert_eq!(msgs[0].to_user.as_ref().unwrap().username, "me"); + assert_eq!(msgs[1].from_user.as_ref().unwrap().username, "me"); + assert_eq!(msgs[1].to_user.as_ref().unwrap().username, "they"); + } + + #[tokio::test] + async fn hydrate_keeps_existing_user_and_skips_show() { + let server = MockServer::start().await; + // user-show は呼ばれないはず (既に fromUser/toUser non-null) + Mock::given(method("POST")) + .and(path("/api/users/show")) + .respond_with(ResponseTemplate::new(500)) + .expect(0) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/api/chat/messages/room-timeline")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!([{ + "id": "m1", + "createdAt": "2025-01-01T00:00:00.000Z", + "fromUserId": "u1", + "fromUser": { + "id": "u1", "username": "alice", "name": "Alice", + "host": null, "avatarUrl": null, "emojis": {} + }, + "toUserId": null, + "toUser": null, + "toRoomId": "r1", + "toRoom": {"id": "r1", "name": "Room", "description": null}, + "text": "hi", + "fileId": null, + "file": null, + "isRead": null, + "reactions": [] + }]))) + .mount(&server) + .await; + + let client = MisskeyClient::with_base_url(&server.uri()); + let msgs = client + .get_chat_room_messages("h", "token", "r1", 30, None, None) + .await + .unwrap(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].from_user.as_ref().unwrap().username, "alice"); + } + + #[tokio::test] + async fn hydrate_swallows_users_show_error() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/api/chat/messages/user-timeline")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!([ + null_user_chat_message_json("m1", "u_other", "u_self"), + ]))) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/api/users/show")) + .respond_with(ResponseTemplate::new(500)) + .mount(&server) + .await; + + let client = MisskeyClient::with_base_url(&server.uri()); + let msgs = client + .get_chat_user_messages("h", "token", "u_other", 30, None, None) + .await + .unwrap(); + // 失敗時は null のまま。panic せず messages は返る。 + assert_eq!(msgs.len(), 1); + assert!(msgs[0].from_user.is_none()); + assert!(msgs[0].to_user.is_none()); + } } diff --git a/src/streaming.rs b/src/streaming.rs index cc63dac..4e3cb1f 100644 --- a/src/streaming.rs +++ b/src/streaming.rs @@ -54,10 +54,7 @@ impl FrontendEmitter for EventBusEmitter { /// Emit to FrontendEmitter only (status events, polling, etc.) macro_rules! emit_or_log { ($emitter:expr, $event:expr, $payload:expr) => { - $emitter.emit( - $event, - serde_json::to_value(&$payload).unwrap_or_default(), - ); + $emitter.emit($event, serde_json::to_value(&$payload).unwrap_or_default()); }; } @@ -73,7 +70,6 @@ macro_rules! emit_event { }}; } - const WS_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); fn ws_config() -> WebSocketConfig { @@ -194,10 +190,20 @@ pub struct StreamStatusEvent { // --- Internal commands sent to the WebSocket task --- enum WsCommand { - Subscribe { channel: String, id: String, params: Option }, - Unsubscribe { id: String }, - SubNote { id: String }, - UnsubNote { id: String }, + Subscribe { + channel: String, + id: String, + params: Option, + }, + Unsubscribe { + id: String, + }, + SubNote { + id: String, + }, + UnsubNote { + id: String, + }, Shutdown, } @@ -298,12 +304,18 @@ impl StreamingManager { let event_bus = self.event_bus.clone(); let db = self.db.clone(); + let api_client = self.api_client.clone(); + let host_owned = host.to_string(); + let token_owned = token.to_string(); let task = tokio::spawn(async move { connection_task( emitter, event_bus, db, + api_client, account_id_owned, + host_owned, + token_owned, url_owned, ws_stream, cmd_rx, @@ -321,10 +333,14 @@ impl StreamingManager { }, ); - emit_or_log!(self.emitter, "stream-status", StreamStatusEvent { - account_id: account_id.to_string(), - state: "connected".to_string(), - }); + emit_or_log!( + self.emitter, + "stream-status", + StreamStatusEvent { + account_id: account_id.to_string(), + state: "connected".to_string(), + } + ); Ok(()) } @@ -354,10 +370,14 @@ impl StreamingManager { let mut subs = self.subscriptions.write().await; subs.retain(|_, info| info.account_id != account_id); - emit_or_log!(self.emitter, "stream-status", StreamStatusEvent { - account_id: account_id.to_string(), - state: "disconnected".to_string(), - }); + emit_or_log!( + self.emitter, + "stream-status", + StreamStatusEvent { + account_id: account_id.to_string(), + state: "disconnected".to_string(), + } + ); } /// Switch between realtime (WebSocket) and polling (HTTP) mode. @@ -396,10 +416,14 @@ impl StreamingManager { let interval = Duration::from_millis(interval_ms.unwrap_or(15_000)); self.start_polling(account_id, host, token, interval).await; - emit_or_log!(self.emitter, "stream-status", StreamStatusEvent { - account_id: account_id.to_string(), - state: "connected".to_string(), - }); + emit_or_log!( + self.emitter, + "stream-status", + StreamStatusEvent { + account_id: account_id.to_string(), + state: "connected".to_string(), + } + ); } _ => { return Err(NoteDeckError::InvalidInput(format!("unknown mode: {mode}"))); @@ -408,13 +432,7 @@ impl StreamingManager { Ok(()) } - async fn start_polling( - &self, - account_id: &str, - host: &str, - token: &str, - interval: Duration, - ) { + async fn start_polling(&self, account_id: &str, host: &str, token: &str, interval: Duration) { let mut polls = self.poll_connections.lock().await; // Stop existing polling task @@ -474,7 +492,8 @@ impl StreamingManager { let params = list_id.as_ref().map(|id| json!({ "listId": id })); let host = self.get_host(account_id).await?; - self.send_subscribe(account_id, &channel, &sub_id, params.clone()).await?; + self.send_subscribe(account_id, &channel, &sub_id, params.clone()) + .await?; let mut subs = self.subscriptions.write().await; subs.insert( @@ -502,7 +521,8 @@ impl StreamingManager { let params = Some(json!({ "antennaId": antenna_id })); let host = self.get_host(account_id).await?; - self.send_subscribe(account_id, "antenna", &sub_id, params.clone()).await?; + self.send_subscribe(account_id, "antenna", &sub_id, params.clone()) + .await?; let mut subs = self.subscriptions.write().await; subs.insert( @@ -530,7 +550,8 @@ impl StreamingManager { let params = Some(json!({ "channelId": channel_id })); let host = self.get_host(account_id).await?; - self.send_subscribe(account_id, "channel", &sub_id, params.clone()).await?; + self.send_subscribe(account_id, "channel", &sub_id, params.clone()) + .await?; let mut subs = self.subscriptions.write().await; subs.insert( @@ -558,7 +579,8 @@ impl StreamingManager { let params = Some(json!({ "roleId": role_id })); let host = self.get_host(account_id).await?; - self.send_subscribe(account_id, "roleTimeline", &sub_id, params.clone()).await?; + self.send_subscribe(account_id, "roleTimeline", &sub_id, params.clone()) + .await?; let mut subs = self.subscriptions.write().await; subs.insert( @@ -586,7 +608,8 @@ impl StreamingManager { let params = Some(json!({ "otherId": other_id })); let host = self.get_host(account_id).await?; - self.send_subscribe(account_id, "chatUser", &sub_id, params.clone()).await?; + self.send_subscribe(account_id, "chatUser", &sub_id, params.clone()) + .await?; let mut subs = self.subscriptions.write().await; subs.insert( @@ -614,7 +637,8 @@ impl StreamingManager { let params = Some(json!({ "roomId": room_id })); let host = self.get_host(account_id).await?; - self.send_subscribe(account_id, "chatRoom", &sub_id, params.clone()).await?; + self.send_subscribe(account_id, "chatRoom", &sub_id, params.clone()) + .await?; let mut subs = self.subscriptions.write().await; subs.insert( @@ -637,7 +661,8 @@ impl StreamingManager { let sub_id = uuid::Uuid::new_v4().to_string(); let host = self.get_host(account_id).await?; - self.send_subscribe(account_id, "main", &sub_id, None).await?; + self.send_subscribe(account_id, "main", &sub_id, None) + .await?; let mut subs = self.subscriptions.write().await; subs.insert( @@ -656,7 +681,11 @@ impl StreamingManager { Ok(sub_id) } - pub async fn unsubscribe(&self, account_id: &str, subscription_id: &str) -> Result<(), NoteDeckError> { + pub async fn unsubscribe( + &self, + account_id: &str, + subscription_id: &str, + ) -> Result<(), NoteDeckError> { // Send unsubscribe to WebSocket if in realtime mode let conns = self.connections.lock().await; if let Some(handle) = conns.get(account_id) { @@ -687,9 +716,7 @@ impl StreamingManager { let mut subs = self.subscriptions.write().await; let info = subs .get_mut(subscription_id) - .ok_or_else(|| { - NoteDeckError::InvalidInput("subscription not found".to_string()) - })?; + .ok_or_else(|| NoteDeckError::InvalidInput("subscription not found".to_string()))?; if info.account_id != account_id { return Err(NoteDeckError::InvalidInput( "subscription account mismatch".to_string(), @@ -720,9 +747,7 @@ impl StreamingManager { let subs = self.subscriptions.read().await; let info = subs .get(subscription_id) - .ok_or_else(|| { - NoteDeckError::InvalidInput("subscription not found".to_string()) - })?; + .ok_or_else(|| NoteDeckError::InvalidInput("subscription not found".to_string()))?; if info.account_id != account_id { return Err(NoteDeckError::InvalidInput( "subscription account mismatch".to_string(), @@ -734,7 +759,8 @@ impl StreamingManager { return Ok(()); } - self.send_subscribe(account_id, &channel, subscription_id, params).await?; + self.send_subscribe(account_id, &channel, subscription_id, params) + .await?; let mut subs = self.subscriptions.write().await; if let Some(info) = subs.get_mut(subscription_id) { @@ -749,7 +775,9 @@ impl StreamingManager { if let Some(handle) = conns.get(account_id) { return handle .cmd_tx - .send(WsCommand::SubNote { id: note_id.to_string() }) + .send(WsCommand::SubNote { + id: note_id.to_string(), + }) .map_err(|_| NoteDeckError::ConnectionClosed); } drop(conns); @@ -769,7 +797,9 @@ impl StreamingManager { if let Some(handle) = conns.get(account_id) { return handle .cmd_tx - .send(WsCommand::UnsubNote { id: note_id.to_string() }) + .send(WsCommand::UnsubNote { + id: note_id.to_string(), + }) .map_err(|_| NoteDeckError::ConnectionClosed); } drop(conns); @@ -831,15 +861,10 @@ impl StreamingManager { } } -type WsStream = tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, ->; +type WsStream = + tokio_tungstenite::WebSocketStream>; type WsRead = futures_util::stream::SplitStream; -type WsWrite = Arc< - Mutex< - futures_util::stream::SplitSink, - >, ->; +type WsWrite = Arc>>; enum WsExitReason { Disconnected, @@ -854,7 +879,10 @@ async fn connection_task( emitter: Arc, event_bus: Arc, db: Arc, + api_client: Arc, account_id: String, + host: String, + token: String, url: String, initial_ws: WsStream, mut cmd_rx: mpsc::UnboundedReceiver, @@ -863,17 +891,33 @@ async fn connection_task( let mut backoff_secs: u64 = 1; // Run the first session with the already-connected WebSocket - let reason = run_ws_session(&emitter, &event_bus, &db, &account_id, initial_ws, &mut cmd_rx, &subscriptions).await; + let reason = run_ws_session( + &emitter, + &event_bus, + &db, + &api_client, + &account_id, + &host, + &token, + initial_ws, + &mut cmd_rx, + &subscriptions, + ) + .await; if matches!(reason, WsExitReason::Shutdown) { return; } // Reconnection loop loop { - emit_or_log!(emitter, "stream-status", StreamStatusEvent { - account_id: account_id.clone(), - state: "reconnecting".to_string(), - }); + emit_or_log!( + emitter, + "stream-status", + StreamStatusEvent { + account_id: account_id.clone(), + state: "reconnecting".to_string(), + } + ); // Wait with backoff, but listen for Shutdown during the wait let sleep = tokio::time::sleep(Duration::from_secs(backoff_secs)); @@ -908,16 +952,23 @@ async fn connection_task( Ok(Ok((ws_stream, _))) => { backoff_secs = 1; // Reset backoff on success - emit_or_log!(emitter, "stream-status", StreamStatusEvent { - account_id: account_id.clone(), - state: "connected".to_string(), - }); + emit_or_log!( + emitter, + "stream-status", + StreamStatusEvent { + account_id: account_id.clone(), + state: "connected".to_string(), + } + ); let reason = run_ws_session( &emitter, &event_bus, &db, + &api_client, &account_id, + &host, + &token, ws_stream, &mut cmd_rx, &subscriptions, @@ -941,11 +992,15 @@ async fn connection_task( } /// Run a single WebSocket session. Re-subscribes existing channels, then enters the message loop. +#[allow(clippy::too_many_arguments)] async fn run_ws_session( emitter: &Arc, event_bus: &Arc, db: &Arc, + api_client: &Arc, account_id: &str, + host: &str, + token: &str, ws_stream: WsStream, cmd_rx: &mut mpsc::UnboundedReceiver, subscriptions: &Arc>>, @@ -977,7 +1032,20 @@ async fn run_ws_session( } } - ws_loop(emitter, event_bus, db, account_id, read, write, cmd_rx, subscriptions).await + ws_loop( + emitter, + event_bus, + db, + api_client, + account_id, + host, + token, + read, + write, + cmd_rx, + subscriptions, + ) + .await } #[allow(clippy::too_many_arguments)] @@ -985,7 +1053,10 @@ async fn ws_loop( emitter: &Arc, event_bus: &Arc, db: &Arc, + api_client: &Arc, account_id: &str, + host: &str, + token: &str, mut read: WsRead, write: WsWrite, cmd_rx: &mut mpsc::UnboundedReceiver, @@ -999,13 +1070,16 @@ async fn ws_loop( msg = read.next() => { match msg { Some(Ok(Message::Text(text))) => { - let emitter = emitter.clone(); - let event_bus = event_bus.clone(); - let db = db.clone(); - let account_id = account_id.to_string(); - let subscriptions = subscriptions.clone(); + let emitter_c = emitter.clone(); + let event_bus_c = event_bus.clone(); + let db_c = db.clone(); + let api_client_c = api_client.clone(); + let account_id_c = account_id.to_string(); + let host_c = host.to_string(); + let token_c = token.to_string(); + let subscriptions_c = subscriptions.clone(); tokio::spawn(async move { - handle_ws_message(&*emitter, &event_bus, &db, &account_id, &text, &subscriptions).await; + handle_ws_message(&*emitter_c, &event_bus_c, &db_c, &api_client_c, &account_id_c, &host_c, &token_c, &text, &subscriptions_c).await; }); } Some(Ok(Message::Ping(data))) => { @@ -1076,11 +1150,15 @@ async fn ws_loop( } } +#[allow(clippy::too_many_arguments)] async fn handle_ws_message( emitter: &dyn FrontendEmitter, event_bus: &EventBus, db: &Arc, + api_client: &Arc, account_id: &str, + account_host: &str, + account_token: &str, text: &str, subscriptions: &Arc>>, ) { @@ -1097,8 +1175,16 @@ async fn handle_ws_message( // Note Capture: { "type": "noteUpdated", "body": { "id": "...", "type": "...", "body": ... } } if msg_type == "noteUpdated" { if let Some(mut body) = msg.get_mut("body").map(Value::take) { - let note_id = body.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_owned(); - let update_type = body.get("type").and_then(|v| v.as_str()).unwrap_or_default().to_owned(); + let note_id = body + .get("id") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_owned(); + let update_type = body + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_owned(); let update_body = body.get_mut("body").map(Value::take).unwrap_or_default(); let payload = StreamNoteCaptureEvent { account_id: account_id.to_string(), @@ -1106,7 +1192,13 @@ async fn handle_ws_message( update_type, body: update_body, }; - emit_event!(emitter, event_bus, "note-capture-updated", "stream-note-capture-updated", payload); + emit_event!( + emitter, + event_bus, + "note-capture-updated", + "stream-note-capture-updated", + payload + ); } return; } @@ -1164,10 +1256,21 @@ async fn handle_ws_message( emit_event!(emitter, event_bus, "note", "stream-note", payload); } } else if is_note_channel && event_type == "noteUpdated" { - let note_id = event_body.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_owned(); - let update_type = event_body.get("type").and_then(|v| v.as_str()).unwrap_or_default().to_owned(); + let note_id = event_body + .get("id") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_owned(); + let update_type = event_body + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_owned(); let mut event_body = event_body; - let update_body = event_body.get_mut("body").map(Value::take).unwrap_or_default(); + let update_body = event_body + .get_mut("body") + .map(Value::take) + .unwrap_or_default(); let payload = StreamNoteUpdatedEvent { account_id: account_id.to_string(), subscription_id: sub_id, @@ -1175,7 +1278,13 @@ async fn handle_ws_message( update_type, body: update_body, }; - emit_event!(emitter, event_bus, "note-updated", "stream-note-updated", payload); + emit_event!( + emitter, + event_bus, + "note-updated", + "stream-note-updated", + payload + ); } else if kind == "main" { if event_type == "notification" { if let Ok(raw) = serde_json::from_value::(event_body) { @@ -1185,7 +1294,13 @@ async fn handle_ws_message( subscription_id: sub_id, notification, }; - emit_event!(emitter, event_bus, "notification", "stream-notification", payload); + emit_event!( + emitter, + event_bus, + "notification", + "stream-notification", + payload + ); } } else if event_type == "mention" || event_type == "reply" { // Serialize event_body as main-event first, then try parsing as mention @@ -1194,7 +1309,8 @@ async fn handle_ws_message( subscription_id: sub_id.clone(), event_type, body: event_body.clone(), - }).unwrap_or_default(); + }) + .unwrap_or_default(); emitter.emit("stream-main-event", main_data); if let Ok(raw) = serde_json::from_value::(event_body) { @@ -1214,11 +1330,26 @@ async fn handle_ws_message( event_type, body: event_body, }; - emit_event!(emitter, event_bus, main_event_type, "stream-main-event", payload); + emit_event!( + emitter, + event_bus, + main_event_type, + "stream-main-event", + payload + ); } } else if kind == "chat" { if event_type == "message" { - if let Ok(msg) = serde_json::from_value::(event_body) { + if let Ok(mut msg) = serde_json::from_value::(event_body) { + // Misskey 本家の chat:message WS event は Lite packer 固定で + // fromUser/toUser を含まない (#460) ため hydrate する。 + api_client + .hydrate_chat_message_users( + account_host, + account_token, + std::slice::from_mut(&mut msg), + ) + .await; // DB upsert (fire-and-forget)。`account_user_id` は DM partner 計算に必要。 if let Ok(Some(account)) = db.get_account(account_id) { let db = db.clone(); @@ -1379,18 +1510,16 @@ async fn polling_loop( let mut poll_failed = false; for (sub_id, info) in &subs_snapshot { - let state = sub_states.entry(sub_id.clone()).or_insert(PollSubState { - since_id: None, - }); + let state = sub_states + .entry(sub_id.clone()) + .or_insert(PollSubState { since_id: None }); let tl_type = TimelineType::new(&info.timeline_type); - let mut options = TimelineOptions::new( - 30, - state.since_id.clone(), - None, - ); + let mut options = TimelineOptions::new(30, state.since_id.clone(), None); options.list_id = info.params.as_ref().and_then(|p| { - p.get("listId").and_then(|v| v.as_str()).map(|s| s.to_string()) + p.get("listId") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) }); match api_client @@ -1442,7 +1571,7 @@ async fn polling_loop( // Note capture: poll every 2nd cycle (2x interval) poll_count += 1; - if poll_count % 2 == 0 { + if poll_count.is_multiple_of(2) { let note_ids: Vec = { let captured = captured_notes.read().await; captured @@ -1462,9 +1591,7 @@ async fn polling_loop( let account_id = account_id.clone(); let note_id = note_id.clone(); async move { - let result = api - .get_note(&host, &token, &account_id, ¬e_id) - .await; + let result = api.get_note(&host, &token, &account_id, ¬e_id).await; (note_id, result) } }) @@ -1496,16 +1623,12 @@ async fn polling_loop( "userId": null, }), }; - emit_or_log!( - emitter, - "stream-note-capture-updated", - payload - ); + emit_or_log!(emitter, "stream-note-capture-updated", payload); } } // Find removed reactions - for (reaction, _) in &old_reactions { + for reaction in old_reactions.keys() { if !new_reactions.contains_key(reaction) { let payload = StreamNoteCaptureEvent { account_id: account_id.clone(), @@ -1516,11 +1639,7 @@ async fn polling_loop( "userId": null, }), }; - emit_or_log!( - emitter, - "stream-note-capture-updated", - payload - ); + emit_or_log!(emitter, "stream-note-capture-updated", payload); } } @@ -1539,10 +1658,14 @@ async fn polling_loop( (1u64 << consecutive_failures.min(5)).min(MAX_POLL_BACKOFF_SECS) }; - emit_or_log!(emitter, "stream-status", StreamStatusEvent { - account_id: account_id.clone(), - state: "reconnecting".to_string(), - }); + emit_or_log!( + emitter, + "stream-status", + StreamStatusEvent { + account_id: account_id.clone(), + state: "reconnecting".to_string(), + } + ); Duration::from_secs(backoff) } else {