Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions dotnet/test/E2E/EventFidelityE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,26 @@ await session.SendAndWaitAsync(new MessageOptions
public async Task Should_Emit_Pending_Messages_Modified_Event_When_Message_Queue_Changes()
{
var session = await CreateSessionAsync();
var pendingMessagesModified = TestHelper.GetNextEventOfTypeAsync<PendingMessagesModifiedEvent>(
session,
static _ => true,
timeout: TimeSpan.FromSeconds(60),
timeoutDescription: "pending_messages.modified event");
var events = new List<SessionEvent>();
session.On<SessionEvent>(evt => { lock (events) { events.Add(evt); } });

await session.SendAsync(new MessageOptions
// Use SendAndWaitAsync + a single event collector to match the pattern
// of every other test in this fixture (and the Rust E2E equivalent).
// The earlier SendAsync + GetFinalAssistantMessageAsync split relied
// on a custom helper with an async-void backfill and required juggling
// two independently-timed awaits, which has been observed to flake in
// CI.
var answer = await session.SendAndWaitAsync(new MessageOptions
{
Prompt = "What is 9+9? Reply with just the number.",
});
}, timeout: TimeSpan.FromSeconds(120));

var answer = await TestHelper.GetFinalAssistantMessageAsync(session);
var pendingEvent = await pendingMessagesModified;
PendingMessagesModifiedEvent? pendingEvent;
lock (events) { pendingEvent = events.OfType<PendingMessagesModifiedEvent>().FirstOrDefault(); }

Assert.NotNull(pendingEvent);
Assert.Contains("18", answer?.Data.Content ?? string.Empty);
Assert.NotNull(answer);
Assert.Contains("18", answer!.Data.Content);

await session.DisposeAsync();
}
Expand Down
38 changes: 28 additions & 10 deletions dotnet/test/Harness/TestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static class TestHelper
using var cts = new CancellationTokenSource(timeout ?? DefaultEventTimeout);

// Both `finalAssistantMessage` and `sawIdle` are set from two threads — the
// subscription callback (CLI read loop) and CheckExistingMessages (RPC reply).
// subscription callback (CLI read loop) and CheckExistingMessagesAsync (RPC reply).
// We complete only once we've observed both, regardless of which path saw which.
var stateLock = new object();
AssistantMessageEvent? finalAssistantMessage = null;
Expand Down Expand Up @@ -59,18 +59,36 @@ void TryComplete()
});

// Backfill from already-delivered messages so we don't lose events that arrived
// between SendAsync returning and the subscription being installed.
CheckExistingMessages();
// between SendAsync returning and the subscription being installed. Run it
// concurrently with the live subscription, but keep the Task observable so any
// exception is propagated through tcs (not the unobserved-task handler) and so
// we can drain it deterministically below. Pass cts.Token so the backfill is
// bounded by the same timeout as the wait itself, and so a hung GetEventsAsync
// can't block the drain in `finally`.
var backfill = CheckExistingMessagesAsync(cts.Token);

using var registration = cts.Token.Register(
static state => ((TaskCompletionSource<AssistantMessageEvent>)state!).TrySetException(
new TimeoutException("Timeout waiting for assistant message")),
tcs);

cts.Token.Register(() => tcs.TrySetException(new TimeoutException("Timeout waiting for assistant message")));

return await tcs.Task;
try
{
return await tcs.Task;
}
finally
{
// Drain the backfill before our `using` scopes (cts, subscription) dispose.
// Any exception was already routed through tcs above, so swallow here.
try { await backfill.ConfigureAwait(false); }
catch (Exception) { /* intentionally ignored: already propagated via tcs */ }
Comment thread
stephentoub marked this conversation as resolved.
}

async void CheckExistingMessages()
async Task CheckExistingMessagesAsync(CancellationToken cancellationToken)
{
try
{
var (existingFinal, existingIdle) = await GetExistingMessagesAsync(session, alreadyIdle);
var (existingFinal, existingIdle) = await GetExistingMessagesAsync(session, alreadyIdle, cancellationToken);
lock (stateLock)
{
// Preserve a newer message captured by the subscription in the meantime.
Expand All @@ -89,9 +107,9 @@ async void CheckExistingMessages()
}
}

private static async Task<(AssistantMessageEvent? Final, bool SawIdle)> GetExistingMessagesAsync(CopilotSession session, bool alreadyIdle)
private static async Task<(AssistantMessageEvent? Final, bool SawIdle)> GetExistingMessagesAsync(CopilotSession session, bool alreadyIdle, CancellationToken cancellationToken = default)
{
var messages = (await session.GetEventsAsync()).ToList();
var messages = (await session.GetEventsAsync(cancellationToken)).ToList();

var lastUserIdx = messages.FindLastIndex(m => m is UserMessageEvent);
var currentTurn = lastUserIdx < 0 ? messages : messages.Skip(lastUserIdx).ToList();
Expand Down
45 changes: 25 additions & 20 deletions go/internal/e2e/event_fidelity_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"sync"
"testing"
"time"

copilot "github.com/github/copilot-sdk/go"
"github.com/github/copilot-sdk/go/internal/e2e/testharness"
Expand Down Expand Up @@ -135,34 +134,40 @@ func TestEventFidelityE2E(t *testing.T) {
}
t.Cleanup(func() { _ = session.Disconnect() })

pendingModified := make(chan *copilot.SessionEvent, 1)
var mu sync.Mutex
var events []copilot.SessionEvent
session.On(func(event copilot.SessionEvent) {
if _, ok := event.Data.(*copilot.PendingMessagesModifiedData); ok {
select {
case pendingModified <- &event:
default:
}
}
mu.Lock()
events = append(events, event)
mu.Unlock()
})

if _, err := session.Send(t.Context(), copilot.MessageOptions{
// SendAndWait collects everything in one round trip and matches the
// pattern of every other test in this file (and the Rust E2E equivalent),
// avoiding the split fire-and-forget + helper pattern that previously
// made this test prone to flakes.
answer, err := session.SendAndWait(t.Context(), copilot.MessageOptions{
Prompt: "What is 9+9? Reply with just the number.",
}); err != nil {
t.Fatalf("Send failed: %v", err)
})
if err != nil {
t.Fatalf("SendAndWait failed: %v", err)
}

select {
case evt := <-pendingModified:
if evt == nil {
t.Error("Expected a non-nil pending_messages.modified event")
snapshot := snapshotEventFidelityEvents(&mu, &events)

var pendingEvent *copilot.SessionEvent
for i := range snapshot {
if _, ok := snapshot[i].Data.(*copilot.PendingMessagesModifiedData); ok {
pendingEvent = &snapshot[i]
break
}
case <-time.After(60 * time.Second):
t.Fatal("Timed out waiting for pending_messages.modified event")
}
if pendingEvent == nil {
t.Error("Expected to observe a pending_messages.modified event")
}

answer, err := testharness.GetFinalAssistantMessage(t.Context(), session)
if err != nil {
t.Fatalf("Failed to get final assistant message: %v", err)
if answer == nil {
t.Fatal("Expected SendAndWait to return an assistant message")
}
if ad, ok := answer.Data.(*copilot.AssistantMessageData); !ok || !strings.Contains(ad.Content, "18") {
t.Errorf("Expected answer to contain '18', got %v", answer.Data)
Expand Down
18 changes: 10 additions & 8 deletions nodejs/test/e2e/event_fidelity.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { join } from "path";
import { describe, expect, it } from "vitest";
import { SessionEvent, approveAll } from "../../src/index.js";
import { createSdkTestContext } from "./harness/sdkTestContext";
import { getFinalAssistantMessage, getNextEventOfType } from "./harness/sdkTestHelper.js";

describe("Event Fidelity", async () => {
const { copilotClient: client, workDir } = await createSdkTestContext();
Expand Down Expand Up @@ -178,17 +177,20 @@ describe("Event Fidelity", async () => {

it("should emit pending messages modified event when message queue changes", async () => {
const session = await client.createSession({ onPermissionRequest: approveAll });
const events: SessionEvent[] = [];
session.on((event) => {
events.push(event);
});

const pendingModifiedP = getNextEventOfType(session, "pending_messages.modified");

void session.send({
// sendAndWait collects everything in one round trip and matches the
// pattern of every other test in this file (and the Rust E2E equivalent),
// avoiding the split fire-and-forget + helper pattern that previously
// made this test prone to flakes.
const answer = await session.sendAndWait({
prompt: "What is 9+9? Reply with just the number.",
});

const [pendingEvent, answer] = await Promise.all([
pendingModifiedP,
getFinalAssistantMessage(session),
]);
const pendingEvent = events.find((e) => e.type === "pending_messages.modified");

expect(pendingEvent).toBeDefined();
expect(answer?.data.content).toContain("18");
Expand Down
68 changes: 32 additions & 36 deletions nodejs/test/e2e/harness/sdkTestHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,50 +8,46 @@ export async function getFinalAssistantMessage(
session: CopilotSession,
{ alreadyIdle = false }: { alreadyIdle?: boolean } = {}
): Promise<AssistantMessageEvent> {
// We don't know whether the answer has already arrived or not, so race both possibilities
return new Promise<AssistantMessageEvent>(async (resolve, reject) => {
getFutureFinalResponse(session).then(resolve).catch(reject);
getExistingFinalResponse(session, alreadyIdle)
.then((msg) => {
if (msg) {
resolve(msg);
}
})
.catch(reject);
});
// Install the live subscription (via getFutureFinalResponse) before issuing the
// existing-messages RPC so we don't miss events that arrive while that RPC is in flight.
const futurePromise = getFutureFinalResponse(session);
// We may end up returning from the existing-messages path; attach a noop handler so
// the unawaited future-response rejection doesn't surface as an unhandled rejection.
futurePromise.catch(() => {});

const existing = await getExistingFinalResponse(session, alreadyIdle);
if (existing) {
return existing;
}
return futurePromise;
}

function getExistingFinalResponse(
async function getExistingFinalResponse(
session: CopilotSession,
alreadyIdle: boolean = false
): Promise<AssistantMessageEvent | undefined> {
return new Promise<AssistantMessageEvent | undefined>(async (resolve, reject) => {
const messages = await session.getEvents();
const finalUserMessageIndex = messages.findLastIndex((m) => m.type === "user.message");
const currentTurnMessages =
finalUserMessageIndex < 0 ? messages : messages.slice(finalUserMessageIndex);
const messages = await session.getEvents();
const finalUserMessageIndex = messages.findLastIndex((m) => m.type === "user.message");
const currentTurnMessages =
finalUserMessageIndex < 0 ? messages : messages.slice(finalUserMessageIndex);

const currentTurnError = currentTurnMessages.find((m) => m.type === "session.error");
if (currentTurnError) {
const error = new Error(currentTurnError.data.message);
error.stack = currentTurnError.data.stack;
reject(error);
return;
}
const currentTurnError = currentTurnMessages.find((m) => m.type === "session.error");
if (currentTurnError) {
const error = new Error(currentTurnError.data.message);
error.stack = currentTurnError.data.stack;
throw error;
}

const sessionIdleMessageIndex = alreadyIdle
? currentTurnMessages.length
: currentTurnMessages.findIndex((m) => m.type === "session.idle");
if (sessionIdleMessageIndex !== -1) {
const lastAssistantMessage = currentTurnMessages
.slice(0, sessionIdleMessageIndex)
.findLast((m) => m.type === "assistant.message");
resolve(lastAssistantMessage as AssistantMessageEvent | undefined);
return;
}
const sessionIdleMessageIndex = alreadyIdle
? currentTurnMessages.length
: currentTurnMessages.findIndex((m) => m.type === "session.idle");
if (sessionIdleMessageIndex !== -1) {
return currentTurnMessages
.slice(0, sessionIdleMessageIndex)
.findLast((m) => m.type === "assistant.message") as AssistantMessageEvent | undefined;
}

resolve(undefined);
});
return undefined;
}

function getFutureFinalResponse(session: CopilotSession): Promise<AssistantMessageEvent> {
Expand Down
26 changes: 11 additions & 15 deletions python/e2e/test_event_fidelity_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import asyncio
from pathlib import Path

import pytest
Expand Down Expand Up @@ -178,22 +177,19 @@ async def test_should_emit_pending_messages_modified_event_when_message_queue_ch
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all
)
pending_task: asyncio.Future = asyncio.get_event_loop().create_future()

def on_event(event):
if isinstance(event.data, PendingMessagesModifiedData) and not pending_task.done():
pending_task.set_result(event)

unsubscribe = session.on(on_event)
events = []
unsubscribe = session.on(events.append)
try:
# Fire-and-forget to trigger pending_messages.modified; then wait for it
asyncio.ensure_future(session.send("What is 9+9? Reply with just the number."))
pending_event = await asyncio.wait_for(pending_task, timeout=60.0)
# send_and_wait collects everything in one round trip and matches the
# pattern of every other test in this file (and the Rust E2E equivalent),
# avoiding the split fire-and-forget + helper pattern that previously
# made this test prone to flakes.
answer = await session.send_and_wait("What is 9+9? Reply with just the number.")

pending_event = next(
(e for e in events if isinstance(e.data, PendingMessagesModifiedData)), None
)
assert pending_event is not None

from .testharness.helper import get_final_assistant_message

answer = await get_final_assistant_message(session, timeout=60.0)
assert answer is not None
assert "18" in (answer.data.content or "")
finally:
Expand Down
Loading