From 4f6f82b5edb04d05abab3af2dad8300e85b95356 Mon Sep 17 00:00:00 2001 From: Rafabd1 Date: Sat, 14 Mar 2026 02:09:49 +0000 Subject: [PATCH] fix: send session_end notification before wipe/close (#6) - Add 'end' wire message type sent to peer before shutdown - Handle incoming 'end' messages in receive_loop to cleanly close session - Both close_session and panic_wipe now notify peer before destroying state - Peer receives session_closed event with reason='peer_ended' Co-Authored-By: Oz --- src-tauri/src/commands/session.rs | 37 ++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/src-tauri/src/commands/session.rs b/src-tauri/src/commands/session.rs index 9bb7c40..327fe18 100644 --- a/src-tauri/src/commands/session.rs +++ b/src-tauri/src/commands/session.rs @@ -41,6 +41,11 @@ struct HandshakeAck { t: String, } +#[derive(Serialize, Deserialize)] +struct SessionEnd { + t: String, +} + #[derive(Serialize)] pub struct IdentityInfo { pub b32_addr: String, @@ -436,9 +441,13 @@ async fn receive_loop(app: AppHandle, mut reader: tokio::io::ReadHalf { if let Err(e) = handle_incoming_message(&app, &frame).await { + let err_msg = e.to_string(); #[cfg(debug_assertions)] - log::warn!("message handling error: {}", e); - let _ = e; + log::warn!("message handling error: {}", err_msg); + // If the peer ended the session, stop the receive loop + if err_msg.contains("peer ended session") { + break; + } } } Err(e) => { @@ -462,7 +471,24 @@ struct WireMessage { n: u32, } +/// Generic wire envelope used to inspect the `t` field before full deserialization. +#[derive(Deserialize)] +struct WireEnvelope { + t: String, +} + async fn handle_incoming_message(app: &AppHandle, frame: &[u8]) -> anyhow::Result<()> { + // Peek at the type field to handle control messages before attempting full parse + let envelope: WireEnvelope = serde_json::from_slice(frame)?; + + // Handle session_end: peer is closing/wiping their session + if envelope.t == "end" { + let state = app.state::(); + *state.session.lock().await = None; + let _ = app.emit("session_closed", serde_json::json!({ "reason": "peer_ended" })); + return Err(anyhow::anyhow!("peer ended session")); + } + let wire: WireMessage = serde_json::from_slice(frame)?; if wire.t != "msg" { return Ok(()); @@ -511,7 +537,9 @@ async fn handle_incoming_message(app: &AppHandle, frame: &[u8]) -> anyhow::Resul pub async fn close_session(state: State<'_, AppState>) -> Result<(), String> { let mut sess = state.session.lock().await; if let Some(mut s) = sess.take() { - // Shut down write half to signal peer + // Notify peer that we are ending the session before shutting down + let end_msg = serde_json::to_vec(&SessionEnd { t: "end".into() }).unwrap_or_default(); + let _ = write_framed(&mut s.stream_writer, &end_msg).await; let _ = s.stream_writer.shutdown().await; } state.messages.lock().await.clear(); @@ -529,6 +557,9 @@ pub async fn do_panic_wipe(app: AppHandle) { { let mut sess = state.session.lock().await; if let Some(mut s) = sess.take() { + // Notify peer that session is ending before destroying everything + let end_msg = serde_json::to_vec(&SessionEnd { t: "end".into() }).unwrap_or_default(); + let _ = write_framed(&mut s.stream_writer, &end_msg).await; let _ = s.stream_writer.shutdown().await; } }