Skip to content

Commit f27d752

Browse files
committed
Fix failing tests; add no-op close implementation
1 parent fc34833 commit f27d752

4 files changed

Lines changed: 51 additions & 44 deletions

File tree

client/client-core/src/main/java/software/amazon/smithy/java/client/core/ClientTransport.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package software.amazon.smithy.java.client.core;
77

88
import java.io.Closeable;
9+
import java.io.IOException;
910
import java.net.ConnectException;
1011
import java.net.ProtocolException;
1112
import java.net.SocketException;
@@ -47,6 +48,14 @@ public interface ClientTransport<RequestT, ResponseT> extends Closeable {
4748
*/
4849
MessageExchange<RequestT, ResponseT> messageExchange();
4950

51+
/**
52+
* {@inheritDoc}
53+
*
54+
* <p>Default implementation is a no-op.
55+
*/
56+
@Override
57+
default void close() throws IOException {}
58+
5059
/**
5160
* Remaps a thrown exception to an appropriate {@link TransportException} or {@link CallException}.
5261
*

http/http-client/src/it/java/software/amazon/smithy/java/http/client/it/h2/TrailerHeadersHttp2Test.java

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import static org.junit.jupiter.api.Assertions.assertEquals;
99
import static org.junit.jupiter.api.Assertions.assertNotNull;
10-
import static org.junit.jupiter.api.Assertions.fail;
1110

1211
import java.util.Map;
1312
import org.junit.jupiter.api.Test;
@@ -43,28 +42,15 @@ protected HttpConnectionPoolBuilder configurePool(HttpConnectionPoolBuilder buil
4342

4443
@Test
4544
void readsResponseWithTrailers() throws Exception {
46-
// Retry up to 5 times due to timing sensitivity in H2 frame ordering
47-
Exception lastException = null;
48-
for (int attempt = 0; attempt < 5; attempt++) {
49-
try {
50-
var request = plainTextRequest(HttpVersion.HTTP_2, "");
51-
try (var exchange = client.newExchange(request)) {
52-
var body = new String(exchange.responseBody().readAllBytes());
53-
assertEquals(RESPONSE_CONTENTS, body);
54-
55-
var trailers = exchange.responseTrailerHeaders();
56-
assertNotNull(trailers, "Should have trailer headers");
57-
assertEquals("abc123", trailers.firstValue("x-checksum"));
58-
assertEquals("req-456", trailers.firstValue("x-request-id"));
59-
}
60-
return; // Success
61-
} catch (Exception e) {
62-
lastException = e;
63-
// Recreate client/server for retry
64-
tearDown();
65-
setUp();
66-
}
45+
var request = plainTextRequest(HttpVersion.HTTP_2, "");
46+
try (var exchange = client.newExchange(request)) {
47+
var body = new String(exchange.responseBody().readAllBytes());
48+
assertEquals(RESPONSE_CONTENTS, body);
49+
50+
var trailers = exchange.responseTrailerHeaders();
51+
assertNotNull(trailers, "Should have trailer headers");
52+
assertEquals("abc123", trailers.firstValue("x-checksum"));
53+
assertEquals("req-456", trailers.firstValue("x-request-id"));
6754
}
68-
fail("Test failed after 5 attempts", lastException);
6955
}
7056
}

http/http-client/src/main/java/software/amazon/smithy/java/http/client/h2/H2Exchange.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public final class H2Exchange implements HttpExchange {
8383
private final H2StreamState state = new H2StreamState();
8484

8585
// Pending headers from reader thread (protected by dataLock)
86-
private List<String> pendingHeaders;
87-
private boolean pendingHeadersEndStream;
86+
private record PendingHeadersEvent(List<String> fields, boolean endStream) {}
87+
private final ArrayDeque<PendingHeadersEvent> pendingHeadersQueue = new ArrayDeque<>();
8888

8989
// === Data chunk queue ===
9090
// Queue of DataChunks received from reader thread. Each chunk contains one DATA frame payload.
@@ -352,8 +352,7 @@ void returnBuffer(byte[] buffer) {
352352
void deliverHeaders(List<String> fields, boolean endStream) {
353353
dataLock.lock();
354354
try {
355-
pendingHeaders = fields;
356-
pendingHeadersEndStream = endStream;
355+
pendingHeadersQueue.add(new PendingHeadersEvent(fields, endStream));
357356
} finally {
358357
dataLock.unlock();
359358
}
@@ -496,11 +495,9 @@ int drainChunks(DataChunk[] dest, int maxChunks) throws IOException {
496495
// Wait for data, EOF, or error
497496
while (dataQueue.isEmpty() && state.getReadState() == RS_READING) {
498497
// Check for pending trailers
499-
if (pendingHeaders != null) {
500-
List<String> fields = pendingHeaders;
501-
boolean endStream = pendingHeadersEndStream;
502-
pendingHeaders = null;
503-
handleHeadersEvent(fields, endStream);
498+
PendingHeadersEvent headerEvent = pendingHeadersQueue.poll();
499+
if (headerEvent != null) {
500+
handleHeadersEvent(headerEvent.fields(), headerEvent.endStream());
504501
if (state.getReadState() == RS_DONE) {
505502
break;
506503
}
@@ -726,7 +723,7 @@ private void awaitEvent() throws IOException {
726723
try {
727724
// Wait for headers, error, or data (which also signals)
728725
int rs;
729-
while (pendingHeaders == null && (rs = state.getReadState()) != RS_ERROR && rs != RS_DONE) {
726+
while (pendingHeadersQueue.isEmpty() && (rs = state.getReadState()) != RS_ERROR && rs != RS_DONE) {
730727
// Wait using lock-free signaling
731728
waitingThread = Thread.currentThread();
732729
dataLock.unlock();
@@ -764,13 +761,10 @@ private void readResponseHeaders() throws IOException {
764761

765762
dataLock.lock();
766763
try {
767-
if (pendingHeaders != null) {
768-
List<String> fields = pendingHeaders;
769-
boolean endStream = pendingHeadersEndStream;
770-
pendingHeaders = null; // Consume the headers
771-
764+
PendingHeadersEvent headerEvent = pendingHeadersQueue.poll();
765+
if (headerEvent != null) {
772766
// Process headers (can throw)
773-
handleHeadersEvent(fields, endStream);
767+
handleHeadersEvent(headerEvent.fields(), headerEvent.endStream());
774768
} else if (state.getReadState() == RS_DONE) {
775769
throw new IOException("Stream ended before response headers received");
776770
}

io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,31 @@ void writeTo() throws IOException {
2727
var ds = DataStream.ofPublisher(publisher, null, -1);
2828
var out = new ByteArrayOutputStream();
2929

30-
Thread.startVirtualThread(() -> {
31-
publisher.submit(ByteBuffer.wrap(chunk1));
32-
publisher.submit(ByteBuffer.wrap(chunk2));
33-
publisher.close();
30+
// Run writeTo on a virtual thread so it subscribes to the publisher
31+
// before items are submitted, avoiding a race where close() fires
32+
// before the subscription is established.
33+
var writeThread = Thread.startVirtualThread(() -> {
34+
try {
35+
ds.writeTo(out);
36+
} catch (IOException e) {
37+
throw new RuntimeException(e);
38+
}
3439
});
3540

36-
ds.writeTo(out);
41+
// Wait for writeTo's subscriber to be registered.
42+
while (publisher.getNumberOfSubscribers() < 1) {
43+
Thread.onSpinWait();
44+
}
45+
46+
publisher.submit(ByteBuffer.wrap(chunk1));
47+
publisher.submit(ByteBuffer.wrap(chunk2));
48+
publisher.close();
49+
50+
try {
51+
writeThread.join();
52+
} catch (InterruptedException e) {
53+
throw new RuntimeException(e);
54+
}
3755

3856
assertArrayEquals("hello world".getBytes(StandardCharsets.UTF_8), out.toByteArray());
3957
}

0 commit comments

Comments
 (0)