Skip to content

Commit aeea24b

Browse files
committed
test: increase java sdk coverage
1 parent 442a6c3 commit aeea24b

8 files changed

Lines changed: 1333 additions & 1 deletion

File tree

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package dev.arcp.client;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5+
6+
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
7+
import dev.arcp.core.auth.Auth;
8+
import dev.arcp.core.capabilities.AgentDescriptor;
9+
import dev.arcp.core.capabilities.Feature;
10+
import dev.arcp.core.credentials.Credential;
11+
import dev.arcp.core.credentials.CredentialId;
12+
import dev.arcp.core.credentials.CredentialScheme;
13+
import dev.arcp.core.ids.JobId;
14+
import dev.arcp.core.ids.SessionId;
15+
import dev.arcp.core.lease.LeaseConstraints;
16+
import dev.arcp.core.messages.JobAccepted;
17+
import dev.arcp.core.messages.JobSummary;
18+
import dev.arcp.core.transport.MemoryTransport;
19+
import dev.arcp.core.wire.ArcpMapper;
20+
import java.time.Duration;
21+
import java.time.Instant;
22+
import java.util.EnumSet;
23+
import java.util.List;
24+
import java.util.Optional;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import org.junit.jupiter.api.Test;
28+
29+
class ClientValueTypesTest {
30+
31+
@Test
32+
void pageSubscribeOptionsAndSessionDefensivelyCopyInputs() {
33+
Page<JobSummary> empty = Page.empty();
34+
assertThat(empty.items()).isEmpty();
35+
assertThat(empty.hasNext()).isFalse();
36+
37+
Page<String> page = new Page<>(List.of("a", "b"), "next");
38+
assertThat(page.items()).containsExactly("a", "b");
39+
assertThat(page.hasNext()).isTrue();
40+
41+
assertThat(SubscribeOptions.live()).isEqualTo(new SubscribeOptions(false, 0));
42+
assertThat(SubscribeOptions.withHistory(4)).isEqualTo(new SubscribeOptions(true, 4));
43+
44+
EnumSet<Feature> mutableFeatures = EnumSet.of(Feature.SUBSCRIBE);
45+
List<AgentDescriptor> agents = List.of(new AgentDescriptor("echo", List.of("1.0.0"), "1.0.0"));
46+
Session session =
47+
new Session(
48+
SessionId.of("sess_client"), mutableFeatures, "resume", Duration.ofSeconds(30), agents);
49+
mutableFeatures.add(Feature.LIST_JOBS);
50+
51+
assertThat(session.negotiatedFeatures()).containsExactly(Feature.SUBSCRIBE);
52+
assertThat(session.availableAgents()).isEqualTo(agents);
53+
}
54+
55+
@Test
56+
void jobHandleCredentialDefaultReturnsOptionalCredentials() {
57+
Credential credential =
58+
new Credential(
59+
CredentialId.of("cred_1"),
60+
CredentialScheme.BEARER,
61+
"secret",
62+
"https://api.example.test",
63+
null,
64+
null);
65+
JobHandle withCredentials =
66+
new FakeJobHandle(
67+
new JobAccepted(
68+
JobId.of("job_with_creds"),
69+
"echo@1.0.0",
70+
null,
71+
null,
72+
null,
73+
List.of(credential),
74+
Instant.parse("2026-05-25T12:00:00Z"),
75+
null));
76+
assertThat(withCredentials.credentials()).contains(List.of(credential));
77+
78+
JobHandle withoutCredentials =
79+
new FakeJobHandle(
80+
new JobAccepted(
81+
JobId.of("job_without_creds"),
82+
"echo@1.0.0",
83+
null,
84+
null,
85+
null,
86+
null,
87+
Instant.parse("2026-05-25T12:00:00Z"),
88+
null));
89+
assertThat(withoutCredentials.credentials()).isEqualTo(Optional.empty());
90+
}
91+
92+
@Test
93+
void clientBuilderAndJobSubmitHelpersPopulateConfiguration() {
94+
MemoryTransport.Pair pair = MemoryTransport.pair();
95+
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
96+
try (ArcpClient client =
97+
ArcpClient.builder(pair.client())
98+
.mapper(ArcpMapper.shared())
99+
.client("custom-client", "2.0.0")
100+
.auth(Auth.anonymous())
101+
.bearer("token")
102+
.features(EnumSet.of(Feature.SUBSCRIBE))
103+
.autoAck(false)
104+
.ackInterval(Duration.ofSeconds(1))
105+
.scheduler(scheduler)
106+
.resumeToken("resume-token")
107+
.lastEventSeq(7)
108+
.build()) {
109+
assertThat(client.lastSeenSeq()).isEqualTo(-1);
110+
assertThat(
111+
ArcpClient.jobSubmit("echo@1.0.0", JsonNodeFactory.instance.objectNode())
112+
.agent()
113+
.wire())
114+
.isEqualTo("echo@1.0.0");
115+
assertThat(
116+
ArcpClient.jobSubmit(
117+
"echo@1.0.0",
118+
JsonNodeFactory.instance.objectNode(),
119+
null,
120+
LeaseConstraints.of(Instant.now().plusSeconds(60)),
121+
"key",
122+
5)
123+
.idempotencyKey())
124+
.isEqualTo("key");
125+
assertThatThrownBy(
126+
() ->
127+
ArcpClient.jobSubmit(
128+
"echo@1.0.0",
129+
JsonNodeFactory.instance.objectNode(),
130+
null,
131+
LeaseConstraints.of(Instant.now().minusSeconds(60)),
132+
null,
133+
null))
134+
.isInstanceOf(IllegalArgumentException.class)
135+
.hasMessageContaining("expires_at");
136+
} finally {
137+
scheduler.shutdownNow();
138+
pair.runtime().close();
139+
}
140+
}
141+
142+
private record FakeJobHandle(JobAccepted accepted) implements JobHandle {
143+
@Override
144+
public JobId jobId() {
145+
return accepted.jobId();
146+
}
147+
148+
@Override
149+
public String resolvedAgent() {
150+
return accepted.agent();
151+
}
152+
153+
@Override
154+
public java.util.concurrent.Flow.Publisher<dev.arcp.core.events.EventBody> events() {
155+
return subscriber -> {};
156+
}
157+
158+
@Override
159+
public java.util.concurrent.CompletableFuture<dev.arcp.core.messages.JobResult> result() {
160+
return new java.util.concurrent.CompletableFuture<>();
161+
}
162+
163+
@Override
164+
public void cancel() {}
165+
}
166+
}

arcp-client/src/test/java/dev/arcp/client/ResultStreamTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
import dev.arcp.core.events.ResultChunkEvent;
77
import dev.arcp.core.ids.ResultId;
8+
import java.io.ByteArrayOutputStream;
9+
import java.io.FilterOutputStream;
810
import java.nio.charset.StandardCharsets;
911
import java.util.Base64;
1012
import org.junit.jupiter.api.Test;
@@ -54,4 +56,61 @@ void duplicateChunkRejected() throws Exception {
5456
assertThatThrownBy(() -> stream.accept(new ResultChunkEvent(id, 0, "a", "utf8", true)))
5557
.isInstanceOf(ResultStream.DuplicateChunkException.class);
5658
}
59+
60+
@Test
61+
void rejectsWrongResultIdAndEncodingSwitches() throws Exception {
62+
ResultStream stream = ResultStream.toMemory(ResultId.of("res_expected"));
63+
assertThatThrownBy(
64+
() ->
65+
stream.accept(
66+
new ResultChunkEvent(
67+
ResultId.of("res_other"), 0, "a", ResultChunkEvent.UTF8, false)))
68+
.isInstanceOf(IllegalArgumentException.class)
69+
.hasMessageContaining("wrong result_id");
70+
71+
ResultStream encoding = ResultStream.toMemory(ResultId.of("res_encoding"));
72+
encoding.accept(
73+
new ResultChunkEvent(ResultId.of("res_encoding"), 0, "a", ResultChunkEvent.UTF8, true));
74+
assertThatThrownBy(
75+
() ->
76+
encoding.accept(
77+
new ResultChunkEvent(
78+
ResultId.of("res_encoding"),
79+
1,
80+
Base64.getEncoder().encodeToString("b".getBytes(StandardCharsets.UTF_8)),
81+
ResultChunkEvent.BASE64,
82+
false)))
83+
.isInstanceOf(ResultStream.EncodingMismatchException.class);
84+
}
85+
86+
@Test
87+
void sinkModeTracksBytesAndRejectsInMemoryAccess() throws Exception {
88+
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
89+
FilterOutputStream sink = new FilterOutputStream(bytes);
90+
ResultStream stream = ResultStream.toSink(ResultId.of("res_sink"), sink);
91+
stream.accept(new ResultChunkEvent(ResultId.of("res_sink"), 0, "abc", "utf8", false));
92+
assertThat(stream.bytesWritten()).isEqualTo(3);
93+
assertThat(bytes.toString(StandardCharsets.UTF_8)).isEqualTo("abc");
94+
assertThatThrownBy(stream::bytes)
95+
.isInstanceOf(IllegalStateException.class)
96+
.hasMessageContaining("not in-memory");
97+
assertThatThrownBy(
98+
() ->
99+
stream.accept(
100+
new ResultChunkEvent(ResultId.of("res_sink"), 1, "late", "utf8", false)))
101+
.isInstanceOf(IllegalStateException.class)
102+
.hasMessageContaining("already closed");
103+
}
104+
105+
@Test
106+
void terminalChunkWithPendingBeyondItIsRejected() throws Exception {
107+
ResultStream stream = ResultStream.toMemory(ResultId.of("res_terminal"));
108+
stream.accept(new ResultChunkEvent(ResultId.of("res_terminal"), 1, "late", "utf8", true));
109+
assertThatThrownBy(
110+
() ->
111+
stream.accept(
112+
new ResultChunkEvent(ResultId.of("res_terminal"), 0, "done", "utf8", false)))
113+
.isInstanceOf(ResultStream.OutOfOrderChunkException.class)
114+
.hasMessageContaining("chunks beyond terminal");
115+
}
57116
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package dev.arcp.client;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
6+
import dev.arcp.core.ids.MessageId;
7+
import dev.arcp.core.wire.ArcpMapper;
8+
import dev.arcp.core.wire.Envelope;
9+
import java.lang.reflect.Constructor;
10+
import java.lang.reflect.Method;
11+
import java.net.http.WebSocket;
12+
import java.nio.ByteBuffer;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.concurrent.BlockingQueue;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.Flow;
18+
import java.util.concurrent.LinkedBlockingQueue;
19+
import java.util.concurrent.TimeUnit;
20+
import org.junit.jupiter.api.Test;
21+
22+
class WebSocketTransportTest {
23+
24+
@Test
25+
void sendHandleTextAndCloseWorkAgainstAttachedSocket() throws Exception {
26+
WebSocketTransport transport = newTransport();
27+
FakeWebSocket socket = new FakeWebSocket();
28+
transport.attachSocket(socket);
29+
BlockingQueue<Envelope> received = new LinkedBlockingQueue<>();
30+
transport.incoming().subscribe(queueingSubscriber(received));
31+
32+
Envelope outbound = envelope("session.ping", "m_out");
33+
transport.send(outbound);
34+
assertThat(socket.textFrames).hasSize(1);
35+
assertThat(socket.textFrames.getFirst()).contains("\"type\":\"session.ping\"");
36+
37+
Method handleText =
38+
WebSocketTransport.class.getDeclaredMethod("handleText", CharSequence.class, boolean.class);
39+
handleText.setAccessible(true);
40+
handleText.invoke(transport, "{\"arcp\":\"1.1\",\"id\":\"m_in\",", false);
41+
assertThat(received.poll(100, TimeUnit.MILLISECONDS)).isNull();
42+
handleText.invoke(transport, "\"type\":\"session.pong\",\"payload\":{}}", true);
43+
assertThat(received.poll(2, TimeUnit.SECONDS)).isEqualTo(envelope("session.pong", "m_in"));
44+
45+
handleText.invoke(transport, "not-json", true);
46+
transport.close();
47+
transport.close();
48+
assertThat(socket.closeFrames).contains(1000);
49+
}
50+
51+
private static WebSocketTransport newTransport() throws Exception {
52+
Constructor<WebSocketTransport> constructor =
53+
WebSocketTransport.class.getDeclaredConstructor(
54+
com.fasterxml.jackson.databind.ObjectMapper.class, java.net.http.HttpClient.class);
55+
constructor.setAccessible(true);
56+
return constructor.newInstance(ArcpMapper.shared(), null);
57+
}
58+
59+
private static Envelope envelope(String type, String id) {
60+
return new Envelope(
61+
Envelope.VERSION,
62+
MessageId.of(id),
63+
type,
64+
null,
65+
null,
66+
null,
67+
null,
68+
JsonNodeFactory.instance.objectNode());
69+
}
70+
71+
private static Flow.Subscriber<Envelope> queueingSubscriber(BlockingQueue<Envelope> queue) {
72+
return new Flow.Subscriber<>() {
73+
@Override
74+
public void onSubscribe(Flow.Subscription subscription) {
75+
subscription.request(Long.MAX_VALUE);
76+
}
77+
78+
@Override
79+
public void onNext(Envelope item) {
80+
queue.add(item);
81+
}
82+
83+
@Override
84+
public void onError(Throwable throwable) {}
85+
86+
@Override
87+
public void onComplete() {}
88+
};
89+
}
90+
91+
private static final class FakeWebSocket implements WebSocket {
92+
private final List<String> textFrames = new ArrayList<>();
93+
private final List<Integer> closeFrames = new ArrayList<>();
94+
95+
@Override
96+
public CompletableFuture<WebSocket> sendText(CharSequence data, boolean last) {
97+
textFrames.add(data.toString());
98+
return CompletableFuture.completedFuture(this);
99+
}
100+
101+
@Override
102+
public CompletableFuture<WebSocket> sendBinary(ByteBuffer data, boolean last) {
103+
return CompletableFuture.completedFuture(this);
104+
}
105+
106+
@Override
107+
public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
108+
return CompletableFuture.completedFuture(this);
109+
}
110+
111+
@Override
112+
public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
113+
return CompletableFuture.completedFuture(this);
114+
}
115+
116+
@Override
117+
public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
118+
closeFrames.add(statusCode);
119+
return CompletableFuture.completedFuture(this);
120+
}
121+
122+
@Override
123+
public void request(long n) {}
124+
125+
@Override
126+
public String getSubprotocol() {
127+
return "";
128+
}
129+
130+
@Override
131+
public boolean isOutputClosed() {
132+
return false;
133+
}
134+
135+
@Override
136+
public boolean isInputClosed() {
137+
return false;
138+
}
139+
140+
@Override
141+
public void abort() {}
142+
}
143+
}

0 commit comments

Comments
 (0)