From b4b23da4c50f29fcd63d727daf074f39e34a828f Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 21 May 2026 11:10:38 -0400 Subject: [PATCH 1/3] Fix flaky pending-messages-modified E2E test across SDKs The pending-messages-modified event-fidelity test was failing intermittently in the C# SDK CI leg. The root cause was that this test was the only one in its fixture not using the standard `SendAndWaitAsync` + event-collector pattern; it went through a custom helper that did two independently-timed awaits and used an `async void` local function for backfilling existing messages. Refactor the test to the standard pattern in C#, Node, Python, and Go, and while here, replace the `async void` (and its JS analog,`new Promise(async (...) => ...)`) with proper `async Task` / `async` functions drained deterministically. The C# helper now has zero `async void` methods. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- dotnet/test/E2E/EventFidelityE2ETests.cs | 24 ++++---- dotnet/test/Harness/TestHelper.cs | 27 +++++++-- go/internal/e2e/event_fidelity_e2e_test.go | 45 +++++++------- nodejs/test/e2e/event_fidelity.e2e.test.ts | 18 +++--- nodejs/test/e2e/harness/sdkTestHelper.ts | 68 ++++++++++------------ python/e2e/test_event_fidelity_e2e.py | 26 ++++----- 6 files changed, 113 insertions(+), 95 deletions(-) diff --git a/dotnet/test/E2E/EventFidelityE2ETests.cs b/dotnet/test/E2E/EventFidelityE2ETests.cs index 4504984f9..8882f972d 100644 --- a/dotnet/test/E2E/EventFidelityE2ETests.cs +++ b/dotnet/test/E2E/EventFidelityE2ETests.cs @@ -136,22 +136,26 @@ await session.SendAndWaitAsync(new MessageOptions public async Task Should_Emit_Pending_Messages_Modified_Event_When_Message_Queue_Changes() { var session = await CreateSessionAsync(); - var pendingMessagesModified = TestHelper.GetNextEventOfTypeAsync( - session, - static _ => true, - timeout: TimeSpan.FromSeconds(60), - timeoutDescription: "pending_messages.modified event"); + var events = new List(); + session.On(evt => { lock (events) { events.Add(evt); } }); - await session.SendAsync(new MessageOptions + // Use SendAndWaitAsync + a single event collector to match the pattern + // of every other test in this fixture (and the Rust E2E equivalent). + // The earlier SendAsync + GetFinalAssistantMessageAsync split relied + // on a custom helper with an async-void backfill and required juggling + // two independently-timed awaits, which has been observed to flake in + // CI. + var answer = await session.SendAndWaitAsync(new MessageOptions { Prompt = "What is 9+9? Reply with just the number.", - }); + }, timeout: TimeSpan.FromSeconds(120)); - var answer = await TestHelper.GetFinalAssistantMessageAsync(session); - var pendingEvent = await pendingMessagesModified; + PendingMessagesModifiedEvent? pendingEvent; + lock (events) { pendingEvent = events.OfType().FirstOrDefault(); } Assert.NotNull(pendingEvent); - Assert.Contains("18", answer?.Data.Content ?? string.Empty); + Assert.NotNull(answer); + Assert.Contains("18", answer!.Data.Content); await session.DisposeAsync(); } diff --git a/dotnet/test/Harness/TestHelper.cs b/dotnet/test/Harness/TestHelper.cs index 5d5839618..ed4b140c7 100644 --- a/dotnet/test/Harness/TestHelper.cs +++ b/dotnet/test/Harness/TestHelper.cs @@ -22,7 +22,7 @@ public static class TestHelper using var cts = new CancellationTokenSource(timeout ?? DefaultEventTimeout); // Both `finalAssistantMessage` and `sawIdle` are set from two threads — the - // subscription callback (CLI read loop) and CheckExistingMessages (RPC reply). + // subscription callback (CLI read loop) and CheckExistingMessagesAsync (RPC reply). // We complete only once we've observed both, regardless of which path saw which. var stateLock = new object(); AssistantMessageEvent? finalAssistantMessage = null; @@ -59,14 +59,29 @@ void TryComplete() }); // Backfill from already-delivered messages so we don't lose events that arrived - // between SendAsync returning and the subscription being installed. - CheckExistingMessages(); + // between SendAsync returning and the subscription being installed. Run it + // concurrently with the live subscription, but keep the Task observable so any + // exception is propagated through tcs (not the unobserved-task handler) and so + // we can drain it deterministically below. + var backfill = CheckExistingMessagesAsync(); - cts.Token.Register(() => tcs.TrySetException(new TimeoutException("Timeout waiting for assistant message"))); + using var registration = cts.Token.Register( + static state => ((TaskCompletionSource)state!).TrySetException( + new TimeoutException("Timeout waiting for assistant message")), + tcs); - return await tcs.Task; + try + { + return await tcs.Task; + } + finally + { + // Drain the backfill before our `using` scopes (cts, subscription) dispose. + // Any exception was already routed through tcs above, so swallow here. + try { await backfill.ConfigureAwait(false); } catch { } + } - async void CheckExistingMessages() + async Task CheckExistingMessagesAsync() { try { diff --git a/go/internal/e2e/event_fidelity_e2e_test.go b/go/internal/e2e/event_fidelity_e2e_test.go index f75fcffad..46db285c5 100644 --- a/go/internal/e2e/event_fidelity_e2e_test.go +++ b/go/internal/e2e/event_fidelity_e2e_test.go @@ -6,7 +6,6 @@ import ( "strings" "sync" "testing" - "time" copilot "github.com/github/copilot-sdk/go" "github.com/github/copilot-sdk/go/internal/e2e/testharness" @@ -135,34 +134,40 @@ func TestEventFidelityE2E(t *testing.T) { } t.Cleanup(func() { _ = session.Disconnect() }) - pendingModified := make(chan *copilot.SessionEvent, 1) + var mu sync.Mutex + var events []copilot.SessionEvent session.On(func(event copilot.SessionEvent) { - if _, ok := event.Data.(*copilot.PendingMessagesModifiedData); ok { - select { - case pendingModified <- &event: - default: - } - } + mu.Lock() + events = append(events, event) + mu.Unlock() }) - if _, err := session.Send(t.Context(), copilot.MessageOptions{ + // SendAndWait collects everything in one round trip and matches the + // pattern of every other test in this file (and the Rust E2E equivalent), + // avoiding the split fire-and-forget + helper pattern that previously + // made this test prone to flakes. + answer, err := session.SendAndWait(t.Context(), copilot.MessageOptions{ Prompt: "What is 9+9? Reply with just the number.", - }); err != nil { - t.Fatalf("Send failed: %v", err) + }) + if err != nil { + t.Fatalf("SendAndWait failed: %v", err) } - select { - case evt := <-pendingModified: - if evt == nil { - t.Error("Expected a non-nil pending_messages.modified event") + snapshot := snapshotEventFidelityEvents(&mu, &events) + + var pendingEvent *copilot.SessionEvent + for i := range snapshot { + if _, ok := snapshot[i].Data.(*copilot.PendingMessagesModifiedData); ok { + pendingEvent = &snapshot[i] + break } - case <-time.After(60 * time.Second): - t.Fatal("Timed out waiting for pending_messages.modified event") + } + if pendingEvent == nil { + t.Error("Expected to observe a pending_messages.modified event") } - answer, err := testharness.GetFinalAssistantMessage(t.Context(), session) - if err != nil { - t.Fatalf("Failed to get final assistant message: %v", err) + if answer == nil { + t.Fatal("Expected SendAndWait to return an assistant message") } if ad, ok := answer.Data.(*copilot.AssistantMessageData); !ok || !strings.Contains(ad.Content, "18") { t.Errorf("Expected answer to contain '18', got %v", answer.Data) diff --git a/nodejs/test/e2e/event_fidelity.e2e.test.ts b/nodejs/test/e2e/event_fidelity.e2e.test.ts index 95b554bcd..da4b8105a 100644 --- a/nodejs/test/e2e/event_fidelity.e2e.test.ts +++ b/nodejs/test/e2e/event_fidelity.e2e.test.ts @@ -7,7 +7,6 @@ import { join } from "path"; import { describe, expect, it } from "vitest"; import { SessionEvent, approveAll } from "../../src/index.js"; import { createSdkTestContext } from "./harness/sdkTestContext"; -import { getFinalAssistantMessage, getNextEventOfType } from "./harness/sdkTestHelper.js"; describe("Event Fidelity", async () => { const { copilotClient: client, workDir } = await createSdkTestContext(); @@ -178,17 +177,20 @@ describe("Event Fidelity", async () => { it("should emit pending messages modified event when message queue changes", async () => { const session = await client.createSession({ onPermissionRequest: approveAll }); + const events: SessionEvent[] = []; + session.on((event) => { + events.push(event); + }); - const pendingModifiedP = getNextEventOfType(session, "pending_messages.modified"); - - void session.send({ + // sendAndWait collects everything in one round trip and matches the + // pattern of every other test in this file (and the Rust E2E equivalent), + // avoiding the split fire-and-forget + helper pattern that previously + // made this test prone to flakes. + const answer = await session.sendAndWait({ prompt: "What is 9+9? Reply with just the number.", }); - const [pendingEvent, answer] = await Promise.all([ - pendingModifiedP, - getFinalAssistantMessage(session), - ]); + const pendingEvent = events.find((e) => e.type === "pending_messages.modified"); expect(pendingEvent).toBeDefined(); expect(answer?.data.content).toContain("18"); diff --git a/nodejs/test/e2e/harness/sdkTestHelper.ts b/nodejs/test/e2e/harness/sdkTestHelper.ts index 18c893f88..7af18766f 100644 --- a/nodejs/test/e2e/harness/sdkTestHelper.ts +++ b/nodejs/test/e2e/harness/sdkTestHelper.ts @@ -8,50 +8,46 @@ export async function getFinalAssistantMessage( session: CopilotSession, { alreadyIdle = false }: { alreadyIdle?: boolean } = {} ): Promise { - // We don't know whether the answer has already arrived or not, so race both possibilities - return new Promise(async (resolve, reject) => { - getFutureFinalResponse(session).then(resolve).catch(reject); - getExistingFinalResponse(session, alreadyIdle) - .then((msg) => { - if (msg) { - resolve(msg); - } - }) - .catch(reject); - }); + // Start listening for the answer immediately so we don't miss any events that arrive + // between the existing-messages RPC starting and the subscription being installed. + const futurePromise = getFutureFinalResponse(session); + // We may end up returning from the existing-messages path; attach a noop handler so + // the unawaited future-response rejection doesn't surface as an unhandled rejection. + futurePromise.catch(() => {}); + + const existing = await getExistingFinalResponse(session, alreadyIdle); + if (existing) { + return existing; + } + return futurePromise; } -function getExistingFinalResponse( +async function getExistingFinalResponse( session: CopilotSession, alreadyIdle: boolean = false ): Promise { - return new Promise(async (resolve, reject) => { - const messages = await session.getEvents(); - const finalUserMessageIndex = messages.findLastIndex((m) => m.type === "user.message"); - const currentTurnMessages = - finalUserMessageIndex < 0 ? messages : messages.slice(finalUserMessageIndex); + const messages = await session.getEvents(); + const finalUserMessageIndex = messages.findLastIndex((m) => m.type === "user.message"); + const currentTurnMessages = + finalUserMessageIndex < 0 ? messages : messages.slice(finalUserMessageIndex); - const currentTurnError = currentTurnMessages.find((m) => m.type === "session.error"); - if (currentTurnError) { - const error = new Error(currentTurnError.data.message); - error.stack = currentTurnError.data.stack; - reject(error); - return; - } + const currentTurnError = currentTurnMessages.find((m) => m.type === "session.error"); + if (currentTurnError) { + const error = new Error(currentTurnError.data.message); + error.stack = currentTurnError.data.stack; + throw error; + } - const sessionIdleMessageIndex = alreadyIdle - ? currentTurnMessages.length - : currentTurnMessages.findIndex((m) => m.type === "session.idle"); - if (sessionIdleMessageIndex !== -1) { - const lastAssistantMessage = currentTurnMessages - .slice(0, sessionIdleMessageIndex) - .findLast((m) => m.type === "assistant.message"); - resolve(lastAssistantMessage as AssistantMessageEvent | undefined); - return; - } + const sessionIdleMessageIndex = alreadyIdle + ? currentTurnMessages.length + : currentTurnMessages.findIndex((m) => m.type === "session.idle"); + if (sessionIdleMessageIndex !== -1) { + return currentTurnMessages + .slice(0, sessionIdleMessageIndex) + .findLast((m) => m.type === "assistant.message") as AssistantMessageEvent | undefined; + } - resolve(undefined); - }); + return undefined; } function getFutureFinalResponse(session: CopilotSession): Promise { diff --git a/python/e2e/test_event_fidelity_e2e.py b/python/e2e/test_event_fidelity_e2e.py index a292247df..17193a308 100644 --- a/python/e2e/test_event_fidelity_e2e.py +++ b/python/e2e/test_event_fidelity_e2e.py @@ -2,7 +2,6 @@ from __future__ import annotations -import asyncio from pathlib import Path import pytest @@ -178,22 +177,19 @@ async def test_should_emit_pending_messages_modified_event_when_message_queue_ch session = await ctx.client.create_session( on_permission_request=PermissionHandler.approve_all ) - pending_task: asyncio.Future = asyncio.get_event_loop().create_future() - - def on_event(event): - if isinstance(event.data, PendingMessagesModifiedData) and not pending_task.done(): - pending_task.set_result(event) - - unsubscribe = session.on(on_event) + events = [] + unsubscribe = session.on(events.append) try: - # Fire-and-forget to trigger pending_messages.modified; then wait for it - asyncio.ensure_future(session.send("What is 9+9? Reply with just the number.")) - pending_event = await asyncio.wait_for(pending_task, timeout=60.0) + # send_and_wait collects everything in one round trip and matches the + # pattern of every other test in this file (and the Rust E2E equivalent), + # avoiding the split fire-and-forget + helper pattern that previously + # made this test prone to flakes. + answer = await session.send_and_wait("What is 9+9? Reply with just the number.") + + pending_event = next( + (e for e in events if isinstance(e.data, PendingMessagesModifiedData)), None + ) assert pending_event is not None - - from .testharness.helper import get_final_assistant_message - - answer = await get_final_assistant_message(session, timeout=60.0) assert answer is not None assert "18" in (answer.data.content or "") finally: From 634bd9ef35ffd6dc3ab9f5ba8fcc687662d9bb07 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 21 May 2026 11:18:13 -0400 Subject: [PATCH 2/3] Address review feedback: explicit catch and corrected comment - TestHelper.cs: replace bare `catch { }` in the backfill-drain `finally` with `catch (Exception) { /* intentionally ignored: already propagated via tcs */ }` to make the swallow intent explicit and satisfy CodeQL. - sdkTestHelper.ts: update the comment above `getFutureFinalResponse` to reflect that the live subscription is installed before the existing- messages RPC fires, matching the actual call ordering. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- dotnet/test/Harness/TestHelper.cs | 3 ++- nodejs/test/e2e/harness/sdkTestHelper.ts | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dotnet/test/Harness/TestHelper.cs b/dotnet/test/Harness/TestHelper.cs index ed4b140c7..f4af794b4 100644 --- a/dotnet/test/Harness/TestHelper.cs +++ b/dotnet/test/Harness/TestHelper.cs @@ -78,7 +78,8 @@ void TryComplete() { // Drain the backfill before our `using` scopes (cts, subscription) dispose. // Any exception was already routed through tcs above, so swallow here. - try { await backfill.ConfigureAwait(false); } catch { } + try { await backfill.ConfigureAwait(false); } + catch (Exception) { /* intentionally ignored: already propagated via tcs */ } } async Task CheckExistingMessagesAsync() diff --git a/nodejs/test/e2e/harness/sdkTestHelper.ts b/nodejs/test/e2e/harness/sdkTestHelper.ts index 7af18766f..a28c2ae5b 100644 --- a/nodejs/test/e2e/harness/sdkTestHelper.ts +++ b/nodejs/test/e2e/harness/sdkTestHelper.ts @@ -8,8 +8,8 @@ export async function getFinalAssistantMessage( session: CopilotSession, { alreadyIdle = false }: { alreadyIdle?: boolean } = {} ): Promise { - // Start listening for the answer immediately so we don't miss any events that arrive - // between the existing-messages RPC starting and the subscription being installed. + // Install the live subscription (via getFutureFinalResponse) before issuing the + // existing-messages RPC so we don't miss events that arrive while that RPC is in flight. const futurePromise = getFutureFinalResponse(session); // We may end up returning from the existing-messages path; attach a noop handler so // the unawaited future-response rejection doesn't surface as an unhandled rejection. From 265ec3e2d3b4ec156bc11daa90e4e397649303a1 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 21 May 2026 11:20:55 -0400 Subject: [PATCH 3/3] Bound backfill drain by timeout token Thread cts.Token through CheckExistingMessagesAsync into GetExistingMessagesAsync.session.GetEventsAsync so a hung backfill can't delay (or prevent) the TimeoutException from surfacing through the `finally` drain. Flagged by the Copilot PR reviewer. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- dotnet/test/Harness/TestHelper.cs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/dotnet/test/Harness/TestHelper.cs b/dotnet/test/Harness/TestHelper.cs index f4af794b4..a230ddb81 100644 --- a/dotnet/test/Harness/TestHelper.cs +++ b/dotnet/test/Harness/TestHelper.cs @@ -62,8 +62,10 @@ void TryComplete() // between SendAsync returning and the subscription being installed. Run it // concurrently with the live subscription, but keep the Task observable so any // exception is propagated through tcs (not the unobserved-task handler) and so - // we can drain it deterministically below. - var backfill = CheckExistingMessagesAsync(); + // we can drain it deterministically below. Pass cts.Token so the backfill is + // bounded by the same timeout as the wait itself, and so a hung GetEventsAsync + // can't block the drain in `finally`. + var backfill = CheckExistingMessagesAsync(cts.Token); using var registration = cts.Token.Register( static state => ((TaskCompletionSource)state!).TrySetException( @@ -82,11 +84,11 @@ void TryComplete() catch (Exception) { /* intentionally ignored: already propagated via tcs */ } } - async Task CheckExistingMessagesAsync() + async Task CheckExistingMessagesAsync(CancellationToken cancellationToken) { try { - var (existingFinal, existingIdle) = await GetExistingMessagesAsync(session, alreadyIdle); + var (existingFinal, existingIdle) = await GetExistingMessagesAsync(session, alreadyIdle, cancellationToken); lock (stateLock) { // Preserve a newer message captured by the subscription in the meantime. @@ -105,9 +107,9 @@ async Task CheckExistingMessagesAsync() } } - private static async Task<(AssistantMessageEvent? Final, bool SawIdle)> GetExistingMessagesAsync(CopilotSession session, bool alreadyIdle) + private static async Task<(AssistantMessageEvent? Final, bool SawIdle)> GetExistingMessagesAsync(CopilotSession session, bool alreadyIdle, CancellationToken cancellationToken = default) { - var messages = (await session.GetEventsAsync()).ToList(); + var messages = (await session.GetEventsAsync(cancellationToken)).ToList(); var lastUserIdx = messages.FindLastIndex(m => m is UserMessageEvent); var currentTurn = lastUserIdx < 0 ? messages : messages.Skip(lastUserIdx).ToList();