From b4901b683ebbef77781245d6914052d16948f54f Mon Sep 17 00:00:00 2001 From: Rafabd1 Date: Sat, 14 Mar 2026 02:11:12 +0000 Subject: [PATCH] fix: add sequence numbers and sender timestamps for message ordering (#7) - Add seq (monotonic sequence number) and ts (sender timestamp) to wire protocol - Track send_seq/recv_seq on ActiveSession for ordering enforcement - Detect out-of-order messages on receive side and log warnings - Use sender timestamp for message display instead of local clock - Sort messages by timestamp in ChatWindow for consistent ordering - Backward compatible: seq/ts fields use serde(default) Co-Authored-By: Oz --- src-tauri/src/commands/messaging.rs | 15 ++++++++++++--- src-tauri/src/commands/session.rs | 30 ++++++++++++++++++++++++++++- src-tauri/src/state.rs | 4 ++++ src/components/ChatWindow.tsx | 8 +++++++- 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src-tauri/src/commands/messaging.rs b/src-tauri/src/commands/messaging.rs index 90dd29d..99a3811 100644 --- a/src-tauri/src/commands/messaging.rs +++ b/src-tauri/src/commands/messaging.rs @@ -22,6 +22,10 @@ struct WireMessage<'a> { id: &'a str, ct: String, n: u32, + /// Sender-side sequence number for ordering enforcement + seq: u64, + /// Sender-side unix timestamp (seconds) for cross-peer consistency + ts: u64, } /// Encrypt and send a message to the active peer. @@ -38,13 +42,16 @@ pub async fn send_message( let id = uuid::Uuid::new_v4().to_string(); let now = now_secs(); - let (ct, counter) = { + let (ct, counter, seq) = { let mut sess = state.session.lock().await; let session = sess.as_mut().ok_or("no active session")?; - session + let (ct, counter) = session .ratchet .encrypt(content.as_bytes()) - .map_err(|e| e.to_string())? + .map_err(|e| e.to_string())?; + let seq = session.send_seq; + session.send_seq += 1; + (ct, counter, seq) }; let wire = serde_json::to_vec(&WireMessage { @@ -52,6 +59,8 @@ pub async fn send_message( id: &id, ct: B64.encode(&ct), n: counter, + seq, + ts: now, }) .map_err(|e| e.to_string())?; diff --git a/src-tauri/src/commands/session.rs b/src-tauri/src/commands/session.rs index 9bb7c40..1e26748 100644 --- a/src-tauri/src/commands/session.rs +++ b/src-tauri/src/commands/session.rs @@ -307,6 +307,8 @@ async fn handle_incoming( ratchet, stream_writer: writer, started_at: now_secs(), + send_seq: 0, + recv_seq: 0, }); // Emit session established event @@ -418,6 +420,8 @@ pub async fn initiate_session( ratchet, stream_writer: writer, started_at: now_secs(), + send_seq: 0, + recv_seq: 0, }); let _ = app.emit("session_established", serde_json::json!({ "peer_dest": peer_dest })); @@ -460,6 +464,12 @@ struct WireMessage { id: String, ct: String, n: u32, + /// Sender-side sequence number for ordering enforcement + #[serde(default)] + seq: u64, + /// Sender-side unix timestamp (seconds) + #[serde(default)] + ts: u64, } async fn handle_incoming_message(app: &AppHandle, frame: &[u8]) -> anyhow::Result<()> { @@ -478,6 +488,21 @@ async fn handle_incoming_message(app: &AppHandle, frame: &[u8]) -> anyhow::Resul let plaintext_buf = { let mut sess = state.session.lock().await; let session = sess.as_mut().ok_or_else(|| anyhow::anyhow!("no session"))?; + + // Enforce message ordering via sender sequence number + if wire.seq != session.recv_seq { + log::warn!( + "out-of-order message: expected seq={}, got seq={}", + session.recv_seq, + wire.seq + ); + // Still process but warn — I2P may reorder occasionally + } + // Advance expected sequence to the maximum seen + 1 + if wire.seq >= session.recv_seq { + session.recv_seq = wire.seq + 1; + } + session.ratchet.decrypt(&ct, wire.n)? }; @@ -489,11 +514,14 @@ async fn handle_incoming_message(app: &AppHandle, frame: &[u8]) -> anyhow::Resul 0 }; + // Use sender timestamp if provided, fall back to local time + let msg_timestamp = if wire.ts > 0 { wire.ts } else { now }; + let entry = MessageEntry { id: wire.id.clone(), content: SecureBuffer::from_slice(content.as_bytes()), is_mine: false, - timestamp: now, + timestamp: msg_timestamp, expires_at, }; // Wipe plaintext intermediate — the content now lives only in SecureBuffer diff --git a/src-tauri/src/state.rs b/src-tauri/src/state.rs index ff5be48..74ee8ca 100644 --- a/src-tauri/src/state.rs +++ b/src-tauri/src/state.rs @@ -53,6 +53,10 @@ pub struct ActiveSession { /// Write half of the active I2P tunnel stream. pub stream_writer: WriteHalf, pub started_at: u64, + /// Monotonic send-side sequence number for message ordering. + pub send_seq: u64, + /// Expected next receive-side sequence number for ordering enforcement. + pub recv_seq: u64, } // ── Settings ────────────────────────────────────────────────────────────────── diff --git a/src/components/ChatWindow.tsx b/src/components/ChatWindow.tsx index 38f7ed0..cddc0d9 100644 --- a/src/components/ChatWindow.tsx +++ b/src/components/ChatWindow.tsx @@ -95,9 +95,15 @@ export default function ChatWindow() { ); } + // Sort messages by timestamp then by id for consistent ordering + const sorted = [...state.messages].sort((a, b) => { + if (a.timestamp !== b.timestamp) return a.timestamp - b.timestamp; + return a.id.localeCompare(b.id); + }); + return (
- {state.messages.map((msg) => ( + {sorted.map((msg) => ( ))}