Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions src-tauri/src/commands/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ struct HandshakeAck {
t: String,
}

#[derive(Serialize, Deserialize)]
struct SessionEnd {
t: String,
}

#[derive(Serialize)]
pub struct IdentityInfo {
pub b32_addr: String,
Expand Down Expand Up @@ -436,9 +441,13 @@ async fn receive_loop(app: AppHandle, mut reader: tokio::io::ReadHalf<tokio::net
match read_framed(&mut reader).await {
Ok(frame) => {
if let Err(e) = handle_incoming_message(&app, &frame).await {
let err_msg = e.to_string();
#[cfg(debug_assertions)]
log::warn!("message handling error: {}", e);
let _ = e;
log::warn!("message handling error: {}", err_msg);
// If the peer ended the session, stop the receive loop
if err_msg.contains("peer ended session") {
break;
}
}
}
Err(e) => {
Expand All @@ -462,7 +471,24 @@ struct WireMessage {
n: u32,
}

/// Generic wire envelope used to inspect the `t` field before full deserialization.
#[derive(Deserialize)]
struct WireEnvelope {
t: String,
}

async fn handle_incoming_message(app: &AppHandle, frame: &[u8]) -> anyhow::Result<()> {
// Peek at the type field to handle control messages before attempting full parse
let envelope: WireEnvelope = serde_json::from_slice(frame)?;

// Handle session_end: peer is closing/wiping their session
if envelope.t == "end" {
let state = app.state::<AppState>();
*state.session.lock().await = None;
let _ = app.emit("session_closed", serde_json::json!({ "reason": "peer_ended" }));
return Err(anyhow::anyhow!("peer ended session"));
}

let wire: WireMessage = serde_json::from_slice(frame)?;
if wire.t != "msg" {
return Ok(());
Expand Down Expand Up @@ -511,7 +537,9 @@ async fn handle_incoming_message(app: &AppHandle, frame: &[u8]) -> anyhow::Resul
pub async fn close_session(state: State<'_, AppState>) -> Result<(), String> {
let mut sess = state.session.lock().await;
if let Some(mut s) = sess.take() {
// Shut down write half to signal peer
// Notify peer that we are ending the session before shutting down
let end_msg = serde_json::to_vec(&SessionEnd { t: "end".into() }).unwrap_or_default();
let _ = write_framed(&mut s.stream_writer, &end_msg).await;
let _ = s.stream_writer.shutdown().await;
}
state.messages.lock().await.clear();
Expand All @@ -529,6 +557,9 @@ pub async fn do_panic_wipe(app: AppHandle) {
{
let mut sess = state.session.lock().await;
if let Some(mut s) = sess.take() {
// Notify peer that session is ending before destroying everything
let end_msg = serde_json::to_vec(&SessionEnd { t: "end".into() }).unwrap_or_default();
let _ = write_framed(&mut s.stream_writer, &end_msg).await;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid awaiting peer notify in panic wipe path

do_panic_wipe now awaits write_framed before clearing session/identity state, so a stalled or backpressured socket can delay the wipe indefinitely (the write path uses write_all/flush and has no timeout). In the panic-wipe scenario this is a correctness and safety regression: local sensitive state is kept alive longer specifically when the peer/network is unhealthy. Make the end notification non-blocking (e.g., bounded timeout or spawned best-effort task) and perform local wipe immediately.

Useful? React with 👍 / 👎.

let _ = s.stream_writer.shutdown().await;
}
}
Expand Down