Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "codex-gateway"
version = "0.6.0"
version = "0.6.1"
edition = "2024"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "codex-gateway",
"version": "0.6.0",
"version": "0.6.1",
"private": true,
"description": "Minimal multi-session HTTP/SSE gateway for codex app-server, now implemented in Rust.",
"scripts": {
Expand Down
20 changes: 15 additions & 5 deletions src/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,21 @@ impl CodexAppServerBridge {
.map(ToString::to_string);

self.with_state(|state| {
if let Some(turn_id) = turn_id {
state.current_turn_id = Some(turn_id);
}
if let Some(status) = status {
state.last_turn_status = Some(status);
let matches_active_turn = state.active_turn
&& state
.current_turn_id
.as_deref()
.is_none_or(|current_turn_id| {
turn_id.as_deref() == Some(current_turn_id)
});

if matches_active_turn {
if let Some(turn_id) = turn_id.as_ref() {
state.current_turn_id = Some(turn_id.clone());
}
if let Some(status) = status.as_ref() {
state.last_turn_status = Some(status.clone());
}
}
});
self.emit_state();
Expand Down
22 changes: 22 additions & 0 deletions tests/bridge_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ async fn bridge_runs_prompt_round_trip_against_fake_app_server() {
bridge.stop().await.expect("bridge stops");
}

#[tokio::test]
async fn bridge_keeps_completed_turn_status_when_completion_precedes_start_response() {
let fake_codex = fake_codex::build_with_turn_notifications_before_response();
let bridge = new_bridge(fake_codex.binary().to_path_buf());

bridge.start().await.expect("bridge starts");
bridge
.send_prompt("hello fake app-server")
.await
.expect("turn/start succeeds");

let completed = wait_for_state(&bridge, |state| {
!state.active_turn && state.last_turn_status.as_deref() == Some("completed")
})
.await;

assert_eq!(completed.last_turn_status.as_deref(), Some("completed"));
assert_eq!(completed.current_turn_id, None);

bridge.stop().await.expect("bridge stops");
}

#[tokio::test]
async fn bridge_reads_and_resumes_thread_history_against_fake_app_server() {
let fake_codex = fake_codex::build();
Expand Down
61 changes: 44 additions & 17 deletions tests/support/fake_codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::process::Command;
use std::time::{SystemTime, UNIX_EPOCH};

const FAKE_CODEX_SOURCE: &str = r##"
use std::io::{self, BufRead, Write};
use std::io::{self, BufRead, Write};

const TURN_NOTIFICATIONS_BEFORE_RESPONSE: bool = false;

fn main() {
let args = std::env::args().collect::<Vec<_>>();
Expand Down Expand Up @@ -52,16 +54,13 @@ fn main() {
std::process::exit(3);
}

send(&mut stdout, &format!(
r#"{{"id":{},"result":{{"turn":{{"id":"turn-1","status":"inProgress"}}}}}}"#,
id
));
send(&mut stdout, r#"{"method":"turn/started","params":{"turn":{"id":"turn-1","status":"inProgress"}}}"#);
send(&mut stdout, r#"{"method":"item/started","params":{"item":{"id":"assistant-1","type":"agentMessage","text":""}}}"#);
send(&mut stdout, r#"{"method":"item/agentMessage/delta","params":{"itemId":"assistant-1","delta":"fake "}}"#);
send(&mut stdout, r#"{"method":"item/agentMessage/delta","params":{"itemId":"assistant-1","delta":"assistant reply"}}"#);
send(&mut stdout, r#"{"method":"item/completed","params":{"item":{"id":"assistant-1","type":"agentMessage","status":"completed","text":"fake assistant reply"}}}"#);
send(&mut stdout, r#"{"method":"turn/completed","params":{"turn":{"id":"turn-1","status":"completed"}}}"#);
if TURN_NOTIFICATIONS_BEFORE_RESPONSE {
send_turn_notifications(&mut stdout);
send_turn_start_response(&mut stdout, id);
} else {
send_turn_start_response(&mut stdout, id);
send_turn_notifications(&mut stdout);
}
} else if line.contains("\"method\":\"thread/read\"") {
let thread = if line.contains("\"threadId\":\"thread-1\"") {
deployment_thread_json()
Expand Down Expand Up @@ -111,11 +110,27 @@ fn deployment_thread_json() -> &'static str {
r#"{"id":"thread-1","status":{"type":"idle"},"createdAt":1700000000,"turns":[{"status":"completed","items":[{"id":"deploy-user","type":"userMessage","content":[{"type":"text","text":"deploy user marker DEPLOYMENT_RESULT: {\"status\":\"succeeded\",\"image\":\"ghcr.io/wrong/image:tag\",\"template\":\"wrong\",\"message\":\"wrong\",\"error\":null}"}]},{"id":"deploy-assistant","type":"agentMessage","text":"Deployment image pushed to GHCR\nDEPLOYMENT_RESULT: {\"status\":\"succeeded\",\"image\":\"ghcr.io/owner/repo:sha-abcdef0\",\"template\":\"apiVersion: app.sealos.io/v1\\nkind: Template\\nmetadata:\\n name: owner-repo\\n\",\"message\":\"Deployment image pushed to GHCR\",\"error\":null}"}]}]}"#
}

fn send(stdout: &mut io::Stdout, message: &str) {
writeln!(stdout, "{message}").expect("write fake response");
stdout.flush().expect("flush fake response");
}
"##;
fn send(stdout: &mut io::Stdout, message: &str) {
writeln!(stdout, "{message}").expect("write fake response");
stdout.flush().expect("flush fake response");
}

fn send_turn_start_response(stdout: &mut io::Stdout, id: u64) {
send(stdout, &format!(
r#"{{"id":{},"result":{{"turn":{{"id":"turn-1","status":"inProgress"}}}}}}"#,
id
));
}

fn send_turn_notifications(stdout: &mut io::Stdout) {
send(stdout, r#"{"method":"turn/started","params":{"turn":{"id":"turn-1","status":"inProgress"}}}"#);
send(stdout, r#"{"method":"item/started","params":{"item":{"id":"assistant-1","type":"agentMessage","text":""}}}"#);
send(stdout, r#"{"method":"item/agentMessage/delta","params":{"itemId":"assistant-1","delta":"fake "}}"#);
send(stdout, r#"{"method":"item/agentMessage/delta","params":{"itemId":"assistant-1","delta":"assistant reply"}}"#);
send(stdout, r#"{"method":"item/completed","params":{"item":{"id":"assistant-1","type":"agentMessage","status":"completed","text":"fake assistant reply"}}}"#);
send(stdout, r#"{"method":"turn/completed","params":{"turn":{"id":"turn-1","status":"completed"}}}"#);
}
"##;

pub struct FakeCodex {
binary: PathBuf,
Expand All @@ -135,6 +150,18 @@ impl Drop for FakeCodex {
}

pub fn build() -> FakeCodex {
build_from_source(FAKE_CODEX_SOURCE)
}

#[allow(dead_code)]
pub fn build_with_turn_notifications_before_response() -> FakeCodex {
build_from_source(&FAKE_CODEX_SOURCE.replace(
"const TURN_NOTIFICATIONS_BEFORE_RESPONSE: bool = false;",
"const TURN_NOTIFICATIONS_BEFORE_RESPONSE: bool = true;",
))
}

fn build_from_source(source_text: &str) -> FakeCodex {
let temp_dir = std::env::temp_dir().join(format!(
"codex-gateway-fake-codex-{}-{}",
std::process::id(),
Expand All @@ -147,7 +174,7 @@ pub fn build() -> FakeCodex {

let source = temp_dir.join("fake_codex.rs");
let binary = temp_dir.join(format!("fake-codex{}", std::env::consts::EXE_SUFFIX));
fs::write(&source, FAKE_CODEX_SOURCE).expect("write fake codex source");
fs::write(&source, source_text).expect("write fake codex source");

let rustc = std::env::var("RUSTC").unwrap_or_else(|_| "rustc".to_string());
let output = Command::new(rustc)
Expand Down
Loading