Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions src-tauri/src/commands/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use base64::{
use serde::{Deserialize, Serialize};
use tauri::{AppHandle, Emitter, Manager, State};
use tokio::io::{split, AsyncWriteExt};
use tokio::time::{timeout, Duration};
use x25519_dalek::{PublicKey, StaticSecret};
use rand::rngs::OsRng;
use zeroize::Zeroize;
Expand Down Expand Up @@ -204,6 +205,10 @@ async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) {
match accept_once_raw(&session_id, &sam_addr).await {
Ok((peer_dest, tunnel)) => {
if let Err(e) = handle_incoming(&app, peer_dest, tunnel).await {
// Emit user-visible error on handshake failure
let error_msg = format!("Connection failed: {}", e);
let _ = app.emit("connection_error", error_msg);

#[cfg(debug_assertions)]
log::warn!("incoming session error: {}", e);
let _ = e;
Expand Down Expand Up @@ -256,6 +261,7 @@ async fn read_sam_line_raw(stream: &mut tokio::net::TcpStream) -> anyhow::Result
}

/// Handle an incoming I2P connection: read handshake, compute X3DH, set session.
/// All I/O operations are wrapped in timeouts to prevent indefinite hangs on degraded I2P tunnels.
async fn handle_incoming(
app: &AppHandle,
peer_dest: String,
Expand All @@ -270,8 +276,11 @@ async fn handle_incoming(

let (mut reader, mut writer) = split(tunnel);

// Read HANDSHAKE_INIT
let frame = read_framed(&mut reader).await?;
// Read HANDSHAKE_INIT with timeout (60 seconds given I2P latency)
let frame = timeout(Duration::from_secs(60), read_framed(&mut reader))
.await
.map_err(|_| anyhow::anyhow!("handshake timeout: peer did not send INIT within 60s"))?;

let init: HandshakeInit = serde_json::from_slice(&frame)?;
if init.t != "hi" {
return Err(anyhow::anyhow!("expected handshake init, got type={}", init.t));
Expand All @@ -295,9 +304,11 @@ async fn handle_incoming(
let ratchet = DoubleRatchet::from_root_key(&root_key, false);
root_key.zeroize();

// Send HANDSHAKE_ACK
// Send HANDSHAKE_ACK with timeout (30 seconds - writing should be fast)
let ack = serde_json::to_vec(&HandshakeAck { t: "ack".into() })?;
write_framed(&mut writer, &ack).await?;
timeout(Duration::from_secs(30), write_framed(&mut writer, &ack))
.await
.map_err(|_| anyhow::anyhow!("handshake timeout: failed to send ACK within 30s"))?;

let peer_ik_bytes = <[u8; 32]>::try_from(ik_a_bytes.as_slice())?;

Expand Down Expand Up @@ -384,7 +395,7 @@ pub async fn initiate_session(

let (mut reader, mut writer) = split(tunnel);

// Send HANDSHAKE_INIT
// Send HANDSHAKE_INIT with timeout (30 seconds - writing should be fast)
let ik_hex = {
let id = state.identity.lock().await;
id.as_ref().unwrap().ik_pub_hex()
Expand All @@ -397,14 +408,17 @@ pub async fn initiate_session(
})
.map_err(|e| e.to_string())?;

write_framed(&mut writer, &init_msg)
timeout(Duration::from_secs(30), write_framed(&mut writer, &init_msg))
.await
.map_err(|_| "handshake timeout: failed to send INIT within 30s")?
.map_err(|e| e.to_string())?;

// Wait for ACK
let ack_frame = read_framed(&mut reader)
// Wait for ACK with timeout (60 seconds given I2P latency)
let ack_frame = timeout(Duration::from_secs(60), read_framed(&mut reader))
.await
.map_err(|_| "handshake timeout: peer did not send ACK within 60s")?
.map_err(|e| e.to_string())?;

let ack: HandshakeAck = serde_json::from_slice(&ack_frame).map_err(|e| e.to_string())?;
if ack.t != "ack" {
return Err(format!("unexpected ack type: {}", ack.t));
Expand Down Expand Up @@ -629,4 +643,4 @@ pub async fn get_safety_numbers(state: State<'_, AppState>) -> Result<String, St
keys.ik_public.as_bytes(),
&session.peer_ik_bytes,
))
}
}