From 53ab6ee11335c7de14024af4e4a8f18adea90d39 Mon Sep 17 00:00:00 2001 From: Erlend Hamnaberg Date: Fri, 10 Feb 2023 11:06:33 +0100 Subject: [PATCH 1/2] Use java.util.concurrent.Flow types * Adds dependency on reactive-streams-tck-flow and flow-adapters * Convert between reactive-streams and flow where needed * Use correct resource filtering directory * Use UTF-8 for resource filtering * Use Java 9 apis using --release flag --- netty-reactive-streams-http/pom.xml | 6 +- .../http/DefaultStreamedHttpRequest.java | 4 +- .../http/DefaultStreamedHttpResponse.java | 5 +- .../http/DefaultWebSocketHttpResponse.java | 6 +- .../http/DelegateStreamedHttpRequest.java | 4 +- .../http/DelegateStreamedHttpResponse.java | 4 +- .../netty/http/HttpStreamsClientHandler.java | 6 +- .../netty/http/HttpStreamsHandler.java | 4 +- .../netty/http/HttpStreamsServerHandler.java | 2 +- .../netty/http/StreamedHttpMessage.java | 2 +- .../netty/http/WebSocketHttpResponse.java | 2 +- .../typesafe/netty/http/AkkaStreamsUtil.java | 17 ++- .../netty/http/DelegateProcessor.java | 8 +- ...HttpIdentityProcessorVerificationTest.java | 25 ++-- .../com/typesafe/netty/http/HttpHelper.java | 13 +- .../typesafe/netty/http/HttpStreamsTest.java | 3 +- .../netty/http/ProcessorHttpClient.java | 2 +- .../netty/http/ProcessorHttpServer.java | 2 +- .../typesafe/netty/http/WebSocketsTest.java | 5 +- netty-reactive-streams/pom.xml | 10 +- .../typesafe/netty/CancelledSubscriber.java | 5 +- .../com/typesafe/netty/HandlerPublisher.java | 6 +- .../com/typesafe/netty/HandlerSubscriber.java | 5 +- .../typesafe/netty/ChannelPublisherTest.java | 7 +- .../HandlerPublisherVerificationTest.java | 10 +- ...lerSubscriberBlackboxVerificationTest.java | 12 +- ...lerSubscriberWhiteboxVerificationTest.java | 9 +- .../java/com/typesafe/netty/ProbeHandler.java | 12 +- .../typesafe/netty/probe/PublisherProbe.java | 4 +- .../typesafe/netty/probe/SubscriberProbe.java | 4 +- pom.xml | 130 +++++++++--------- 31 files changed, 171 insertions(+), 163 deletions(-) diff --git a/netty-reactive-streams-http/pom.xml b/netty-reactive-streams-http/pom.xml index 7f2437dd..bb3874d7 100644 --- a/netty-reactive-streams-http/pom.xml +++ b/netty-reactive-streams-http/pom.xml @@ -5,7 +5,7 @@ com.typesafe.netty netty-reactive-streams-parent - 2.1.0-SNAPSHOT + 3.0.0-SNAPSHOT netty-reactive-streams-http @@ -25,7 +25,7 @@ org.reactivestreams - reactive-streams-tck + reactive-streams-tck-flow org.testng @@ -51,7 +51,7 @@ copy-resources - ${project.build.resources[0].directory}/META-INF + ${project.build.outputDirectory}/META-INF ../ diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultStreamedHttpRequest.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultStreamedHttpRequest.java index d11f5207..1e9d6d7c 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultStreamedHttpRequest.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultStreamedHttpRequest.java @@ -4,8 +4,8 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; /** * A default streamed HTTP request. diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultStreamedHttpResponse.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultStreamedHttpResponse.java index e621794d..e99830ff 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultStreamedHttpResponse.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultStreamedHttpResponse.java @@ -4,8 +4,9 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; + +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; /** * A default streamed HTTP response. diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultWebSocketHttpResponse.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultWebSocketHttpResponse.java index 75a5e9e3..079c707b 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultWebSocketHttpResponse.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DefaultWebSocketHttpResponse.java @@ -5,9 +5,9 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; -import org.reactivestreams.Processor; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Processor; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; /** * A default WebSocket HTTP response. diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DelegateStreamedHttpRequest.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DelegateStreamedHttpRequest.java index bd7c4a16..1e0c68cf 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DelegateStreamedHttpRequest.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DelegateStreamedHttpRequest.java @@ -2,8 +2,8 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpRequest; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; final class DelegateStreamedHttpRequest extends DelegateHttpRequest implements StreamedHttpRequest { diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DelegateStreamedHttpResponse.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DelegateStreamedHttpResponse.java index 47e21a1f..75182f66 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DelegateStreamedHttpResponse.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/DelegateStreamedHttpResponse.java @@ -2,8 +2,8 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponse; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; final class DelegateStreamedHttpResponse extends DelegateHttpResponse implements StreamedHttpResponse { diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsClientHandler.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsClientHandler.java index 3fe734f2..6e8a5a99 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsClientHandler.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsClientHandler.java @@ -5,9 +5,9 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.*; import io.netty.util.ReferenceCountUtil; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; /** * Handler that converts written {@link StreamedHttpRequest} messages into {@link HttpRequest} messages diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsHandler.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsHandler.java index 06a13c1d..5ac7f37f 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsHandler.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsHandler.java @@ -5,8 +5,8 @@ import io.netty.channel.*; import io.netty.handler.codec.http.*; import io.netty.util.ReferenceCountUtil; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; import java.util.LinkedList; import java.util.Queue; diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsServerHandler.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsServerHandler.java index 68877c4c..45888402 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsServerHandler.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/HttpStreamsServerHandler.java @@ -8,7 +8,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketVersion; -import org.reactivestreams.Publisher; +import java.util.concurrent.Flow.Publisher; import java.util.Collections; import java.util.List; diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/StreamedHttpMessage.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/StreamedHttpMessage.java index 9690de69..1058295f 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/StreamedHttpMessage.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/StreamedHttpMessage.java @@ -2,7 +2,7 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpMessage; -import org.reactivestreams.Publisher; +import java.util.concurrent.Flow.Publisher; /** * Combines {@link HttpMessage} and {@link Publisher} into one diff --git a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/WebSocketHttpResponse.java b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/WebSocketHttpResponse.java index 467ff653..0614288c 100644 --- a/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/WebSocketHttpResponse.java +++ b/netty-reactive-streams-http/src/main/java/com/typesafe/netty/http/WebSocketHttpResponse.java @@ -3,7 +3,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; -import org.reactivestreams.Processor; +import java.util.concurrent.Flow.Processor; /** diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/AkkaStreamsUtil.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/AkkaStreamsUtil.java index 38487c4f..23b1384d 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/AkkaStreamsUtil.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/AkkaStreamsUtil.java @@ -4,27 +4,26 @@ import akka.japi.function.Creator; import akka.stream.Materializer; import akka.stream.javadsl.*; -import org.reactivestreams.Processor; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import org.reactivestreams.FlowAdapters; +import java.util.concurrent.Flow.Processor; public class AkkaStreamsUtil { public static Processor flowToProcessor(Flow flow, Materializer materializer) { - Pair, Publisher> pair = + Pair, org.reactivestreams.Publisher> pair = Source.asSubscriber() .via(flow) - .toMat(Sink.asPublisher(AsPublisher.WITH_FANOUT), Keep., Publisher>both()) + .toMat(Sink.asPublisher(AsPublisher.WITH_FANOUT), Keep.both()) .run(materializer); - return new DelegateProcessor<>(pair.first(), pair.second()); + return new DelegateProcessor<>(FlowAdapters.toFlowSubscriber(pair.first()), FlowAdapters.toFlowPublisher(pair.second())); } public static Flow processorToFlow(final Processor processor) { - return Flow.fromProcessor(new Creator>() { + return Flow.fromProcessor(new Creator>() { @Override - public Processor create() throws Exception { - return processor; + public org.reactivestreams.Processor create() throws Exception { + return FlowAdapters.toProcessor(processor); } }); } diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/DelegateProcessor.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/DelegateProcessor.java index fbcc8f87..07776283 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/DelegateProcessor.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/DelegateProcessor.java @@ -1,9 +1,9 @@ package com.typesafe.netty.http; -import org.reactivestreams.Processor; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Processor; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; public class DelegateProcessor implements Processor { diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/FullStackHttpIdentityProcessorVerificationTest.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/FullStackHttpIdentityProcessorVerificationTest.java index 840d6df8..40486cd2 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/FullStackHttpIdentityProcessorVerificationTest.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/FullStackHttpIdentityProcessorVerificationTest.java @@ -8,9 +8,8 @@ import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.http.*; -import org.reactivestreams.Processor; -import org.reactivestreams.Publisher; -import org.reactivestreams.tck.IdentityProcessorVerification; +import org.reactivestreams.FlowAdapters; +import org.reactivestreams.tck.flow.IdentityFlowProcessorVerification; import org.reactivestreams.tck.TestEnvironment; import org.testng.annotations.*; import scala.concurrent.Await; @@ -24,21 +23,24 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.Flow.Processor; +import java.util.concurrent.Flow.Publisher; + /** * This identity processor verification verifies a client making requests to a server, that echos the * body back. - * + *

* The server uses the {@link HttpStreamsServerHandler}, and then exposes the messages sent/received by * that using reactive streams. So it effectively uses streams of streams. It then uses Akka streams * to actually handle the requests, echoing the bodies back in the responses as is. - * + *

* The client uses the {@link HttpStreamsClientHandler}, and then exposes the messages sent/received by * that using reactive streams, so it too is effectively a stream of streams. Here Akka streams is used * to split the String bodies into many chunks, for more interesting verification of the bodies, and then * combines all the chunks together back into a String at the end. */ -public class FullStackHttpIdentityProcessorVerificationTest extends IdentityProcessorVerification { +public class FullStackHttpIdentityProcessorVerificationTest extends IdentityFlowProcessorVerification { private NioEventLoopGroup eventLoop; private Channel serverBindChannel; @@ -86,7 +88,7 @@ public void stop() throws Exception { } @Override - public Processor createIdentityProcessor(int bufferSize) { + public Processor createIdentityFlowProcessor(int bufferSize) { ProcessorHttpClient client = new ProcessorHttpClient(eventLoop); Processor connection = getProcessor(client); @@ -98,7 +100,7 @@ public Processor createIdentityProcessor(int bufferSize) { public HttpRequest apply(String body) throws Exception { List content = new ArrayList<>(); String[] chunks = body.split(":"); - for (String chunk: chunks) { + for (String chunk : chunks) { // Make sure we put the ":" back into the body String c; if (content.isEmpty()) { @@ -133,9 +135,10 @@ private Processor getProcessor(ProcessorHttpClient cl } @Override - public Publisher createFailedPublisher() { - return Source.failed(new RuntimeException("failed")) - .toMat(Sink.asPublisher(AsPublisher.WITH_FANOUT), Keep.>right()).run(materializer); + public Publisher createFailedFlowPublisher() { + return FlowAdapters.toFlowPublisher( + Source.failed(new RuntimeException("failed")) + .toMat(Sink.asPublisher(AsPublisher.WITH_FANOUT), Keep.>right()).run(materializer)); } @Override diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpHelper.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpHelper.java index f12e167b..dc19e15e 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpHelper.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpHelper.java @@ -9,6 +9,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.*; import io.netty.util.ReferenceCountUtil; +import org.reactivestreams.FlowAdapters; import org.reactivestreams.Publisher; import java.nio.charset.Charset; @@ -72,12 +73,12 @@ public HttpResponse echo(Object msg) { public StreamedHttpRequest createStreamedRequest(String method, String uri, List body) { List content = new ArrayList<>(); - for (String chunk: body) { + for (String chunk : body) { content.add(new DefaultHttpContent(Unpooled.copiedBuffer(chunk, Charset.forName("utf-8")))); } Publisher publisher = Source.from(content).runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer); return new DefaultStreamedHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri, - publisher); + FlowAdapters.toFlowPublisher(publisher)); } public StreamedHttpRequest createStreamedRequest(String method, String uri, List body, long contentLength) { @@ -101,11 +102,11 @@ public FullHttpResponse createFullResponse(String body) { public StreamedHttpResponse createStreamedResponse(HttpVersion version, List body, long contentLength) { List content = new ArrayList<>(); - for (String chunk: body) { + for (String chunk : body) { content.add(new DefaultHttpContent(Unpooled.copiedBuffer(chunk, Charset.forName("utf-8")))); } Publisher publisher = Source.from(content).runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer); - StreamedHttpResponse response = new DefaultStreamedHttpResponse(version, HttpResponseStatus.OK, publisher); + StreamedHttpResponse response = new DefaultStreamedHttpResponse(version, HttpResponseStatus.OK, FlowAdapters.toFlowPublisher(publisher)); HttpUtil.setContentLength(response, contentLength); return response; } @@ -119,7 +120,7 @@ public CompletionStage extractBodyAsync(Object msg) { String body = contentAsString((FullHttpMessage) msg); return CompletableFuture.completedFuture(body); } else if (msg instanceof StreamedHttpMessage) { - return Source.fromPublisher((StreamedHttpMessage) msg).runFold("", new Function2() { + return Source.fromPublisher(FlowAdapters.toPublisher((StreamedHttpMessage) msg)).runFold("", new Function2() { @Override public String apply(String body, HttpContent content) throws Exception { return body + contentAsString(content); @@ -156,7 +157,7 @@ public boolean hasRequestContentLength(HttpResponse response) { public void cancelStreamedMessage(Object msg) { if (msg instanceof StreamedHttpMessage) { - Source.fromPublisher((StreamedHttpMessage) msg).runWith(Sink.cancelled(), materializer); + Source.fromPublisher(FlowAdapters.toPublisher((StreamedHttpMessage) msg)).runWith(Sink.cancelled(), materializer); } else { throw new IllegalArgumentException("Unknown message type: " + msg); } diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpStreamsTest.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpStreamsTest.java index d92bc9f2..ad491157 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpStreamsTest.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpStreamsTest.java @@ -15,7 +15,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.*; -import org.reactivestreams.Processor; +import org.reactivestreams.FlowAdapters; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -30,6 +30,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.Flow.Processor; import static org.testng.Assert.*; diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/ProcessorHttpClient.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/ProcessorHttpClient.java index 32fd5b9b..25cfdf34 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/ProcessorHttpClient.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/ProcessorHttpClient.java @@ -8,7 +8,7 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.*; import io.netty.util.concurrent.*; -import org.reactivestreams.Processor; +import java.util.concurrent.Flow.Processor; import java.net.SocketAddress; diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/ProcessorHttpServer.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/ProcessorHttpServer.java index 21a08693..f3719eaf 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/ProcessorHttpServer.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/ProcessorHttpServer.java @@ -10,10 +10,10 @@ import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; -import org.reactivestreams.Processor; import java.net.SocketAddress; import java.util.concurrent.Callable; +import java.util.concurrent.Flow.Processor; public class ProcessorHttpServer { diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/WebSocketsTest.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/WebSocketsTest.java index 2dd4a126..b90c7528 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/WebSocketsTest.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/WebSocketsTest.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Processor; +import org.reactivestreams.FlowAdapters; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -73,7 +74,7 @@ public WebSocketFrame apply(WebSocketFrame msg) throws Exception { }).toProcessor().run(materializer); ctx.writeAndFlush(new DefaultWebSocketHttpResponse(request.protocolVersion(), - HttpResponseStatus.valueOf(200), processor, + HttpResponseStatus.valueOf(200), FlowAdapters.toFlowProcessor(processor), new WebSocketServerHandshakerFactory("ws://127.0.0.1/" + port + "/", null, withExtensions) )); } @@ -139,7 +140,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception Processor processor = Flow.create().toProcessor().run(materializer); ctx.writeAndFlush(new DefaultWebSocketHttpResponse(request.protocolVersion(), - HttpResponseStatus.valueOf(200), processor, + HttpResponseStatus.valueOf(200), FlowAdapters.toFlowProcessor(processor), new WebSocketServerHandshakerFactory("ws://127.0.0.1/" + port + "/", null, false) )); } diff --git a/netty-reactive-streams/pom.xml b/netty-reactive-streams/pom.xml index f3b0b37e..0e063047 100644 --- a/netty-reactive-streams/pom.xml +++ b/netty-reactive-streams/pom.xml @@ -5,7 +5,7 @@ com.typesafe.netty netty-reactive-streams-parent - 2.1.0-SNAPSHOT + 3.0.0-SNAPSHOT netty-reactive-streams @@ -20,11 +20,7 @@ org.reactivestreams - reactive-streams - - - org.reactivestreams - reactive-streams-tck + reactive-streams-tck-flow org.testng @@ -45,7 +41,7 @@ copy-resources - ${project.build.resources[0].directory}/META-INF + ${project.build.outputDirectory}/META-INF ../ diff --git a/netty-reactive-streams/src/main/java/com/typesafe/netty/CancelledSubscriber.java b/netty-reactive-streams/src/main/java/com/typesafe/netty/CancelledSubscriber.java index b67ee698..8708f47f 100644 --- a/netty-reactive-streams/src/main/java/com/typesafe/netty/CancelledSubscriber.java +++ b/netty-reactive-streams/src/main/java/com/typesafe/netty/CancelledSubscriber.java @@ -1,7 +1,8 @@ package com.typesafe.netty; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; + /** * A cancelled subscriber. diff --git a/netty-reactive-streams/src/main/java/com/typesafe/netty/HandlerPublisher.java b/netty-reactive-streams/src/main/java/com/typesafe/netty/HandlerPublisher.java index b1c34aaf..558ef6ec 100644 --- a/netty-reactive-streams/src/main/java/com/typesafe/netty/HandlerPublisher.java +++ b/netty-reactive-streams/src/main/java/com/typesafe/netty/HandlerPublisher.java @@ -7,9 +7,9 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.internal.TypeParameterMatcher; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import static com.typesafe.netty.HandlerPublisher.State.*; import java.util.*; diff --git a/netty-reactive-streams/src/main/java/com/typesafe/netty/HandlerSubscriber.java b/netty-reactive-streams/src/main/java/com/typesafe/netty/HandlerSubscriber.java index 09b4f5c8..ce40103d 100644 --- a/netty-reactive-streams/src/main/java/com/typesafe/netty/HandlerSubscriber.java +++ b/netty-reactive-streams/src/main/java/com/typesafe/netty/HandlerSubscriber.java @@ -5,8 +5,9 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.EventExecutor; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; + import java.util.concurrent.atomic.AtomicBoolean; diff --git a/netty-reactive-streams/src/test/java/com/typesafe/netty/ChannelPublisherTest.java b/netty-reactive-streams/src/test/java/com/typesafe/netty/ChannelPublisherTest.java index 0f36b917..e72493c3 100644 --- a/netty-reactive-streams/src/test/java/com/typesafe/netty/ChannelPublisherTest.java +++ b/netty-reactive-streams/src/test/java/com/typesafe/netty/ChannelPublisherTest.java @@ -6,9 +6,10 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Promise; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; + import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerPublisherVerificationTest.java b/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerPublisherVerificationTest.java index c7c71c1c..4449a913 100644 --- a/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerPublisherVerificationTest.java +++ b/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerPublisherVerificationTest.java @@ -4,8 +4,8 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.local.LocalChannel; -import org.reactivestreams.Publisher; -import org.reactivestreams.tck.PublisherVerification; +import java.util.concurrent.Flow.Publisher; +import org.reactivestreams.tck.flow.FlowPublisherVerification; import org.reactivestreams.tck.TestEnvironment; import org.testng.annotations.*; @@ -15,7 +15,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -public class HandlerPublisherVerificationTest extends PublisherVerification { +public class HandlerPublisherVerificationTest extends FlowPublisherVerification { private final int batchSize; // The number of elements to publish initially, before the subscriber is received @@ -98,7 +98,7 @@ public void stopExecutor() { } @Override - public Publisher createPublisher(final long elements) { + public Publisher createFlowPublisher(final long elements) { final BatchedProducer out; if (scheduled) { out = new ScheduledBatchedProducer(elements, batchSize, publishInitial, executor, 5); @@ -131,7 +131,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } @Override - public Publisher createFailedPublisher() { + public Publisher createFailedFlowPublisher() { LocalChannel channel = new LocalChannel(); eventLoop.register(channel); HandlerPublisher publisher = new HandlerPublisher<>(channel.eventLoop(), Long.class); diff --git a/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerSubscriberBlackboxVerificationTest.java b/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerSubscriberBlackboxVerificationTest.java index 3bb7d77d..a5535a25 100644 --- a/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerSubscriberBlackboxVerificationTest.java +++ b/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerSubscriberBlackboxVerificationTest.java @@ -3,19 +3,19 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.embedded.EmbeddedChannel; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.reactivestreams.tck.SubscriberBlackboxVerification; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import org.reactivestreams.tck.flow.FlowSubscriberBlackboxVerification; import org.reactivestreams.tck.TestEnvironment; -public class HandlerSubscriberBlackboxVerificationTest extends SubscriberBlackboxVerification { +public class HandlerSubscriberBlackboxVerificationTest extends FlowSubscriberBlackboxVerification { public HandlerSubscriberBlackboxVerificationTest() { super(new TestEnvironment()); } @Override - public Subscriber createSubscriber() { + public Subscriber createFlowSubscriber() { // Embedded channel requires at least one handler when it's created, but HandlerSubscriber // needs the channels event loop in order to be created, so start with a dummy, then replace. ChannelHandler dummy = new ChannelDuplexHandler(); @@ -32,7 +32,7 @@ public Long createElement(int element) { } @Override - public void triggerRequest(Subscriber subscriber) { + public void triggerFlowRequest(Subscriber subscriber) { EmbeddedChannel channel = ((SubscriberWithChannel) subscriber).channel; channel.runPendingTasks(); diff --git a/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerSubscriberWhiteboxVerificationTest.java b/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerSubscriberWhiteboxVerificationTest.java index f2cd83c2..96ab2786 100644 --- a/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerSubscriberWhiteboxVerificationTest.java +++ b/netty-reactive-streams/src/test/java/com/typesafe/netty/HandlerSubscriberWhiteboxVerificationTest.java @@ -3,13 +3,14 @@ import io.netty.channel.*; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Promise; -import org.reactivestreams.Subscriber; -import org.reactivestreams.tck.SubscriberWhiteboxVerification; +import org.reactivestreams.tck.flow.FlowSubscriberWhiteboxVerification; import org.reactivestreams.tck.TestEnvironment; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; -public class HandlerSubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification { +import java.util.concurrent.Flow.Subscriber; + +public class HandlerSubscriberWhiteboxVerificationTest extends FlowSubscriberWhiteboxVerification { private boolean workAroundIssue277; @@ -35,7 +36,7 @@ public void stopEventLoop() { } @Override - public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) { + public Subscriber createFlowSubscriber(WhiteboxSubscriberProbe probe) { final ClosedLoopChannel channel = new ClosedLoopChannel(); diff --git a/netty-reactive-streams/src/test/java/com/typesafe/netty/ProbeHandler.java b/netty-reactive-streams/src/test/java/com/typesafe/netty/ProbeHandler.java index 8ac5d4a8..2b526fca 100644 --- a/netty-reactive-streams/src/test/java/com/typesafe/netty/ProbeHandler.java +++ b/netty-reactive-streams/src/test/java/com/typesafe/netty/ProbeHandler.java @@ -3,21 +3,21 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.reactivestreams.tck.SubscriberWhiteboxVerification; +import org.reactivestreams.tck.flow.FlowSubscriberWhiteboxVerification; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; -public class ProbeHandler extends ChannelDuplexHandler implements SubscriberWhiteboxVerification.SubscriberPuppet { +public class ProbeHandler extends ChannelDuplexHandler implements FlowSubscriberWhiteboxVerification.SubscriberPuppet { private static final int NO_CONTEXT = 0; private static final int RUN = 1; private static final int CANCEL = 2; - private final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe probe; + private final FlowSubscriberWhiteboxVerification.WhiteboxSubscriberProbe probe; private final Class clazz; private final Queue queue = new LinkedList<>(); private final AtomicInteger state = new AtomicInteger(NO_CONTEXT); @@ -25,7 +25,7 @@ public class ProbeHandler extends ChannelDuplexHandler implements SubscriberW // Netty doesn't provide a way to send errors out, so we capture whether it was an error or complete here private volatile Throwable receivedError = null; - public ProbeHandler(SubscriberWhiteboxVerification.WhiteboxSubscriberProbe probe, Class clazz) { + public ProbeHandler(FlowSubscriberWhiteboxVerification.WhiteboxSubscriberProbe probe, Class clazz) { this.probe = probe; this.clazz = clazz; } diff --git a/netty-reactive-streams/src/test/java/com/typesafe/netty/probe/PublisherProbe.java b/netty-reactive-streams/src/test/java/com/typesafe/netty/probe/PublisherProbe.java index 435984f5..74690b17 100644 --- a/netty-reactive-streams/src/test/java/com/typesafe/netty/probe/PublisherProbe.java +++ b/netty-reactive-streams/src/test/java/com/typesafe/netty/probe/PublisherProbe.java @@ -1,7 +1,7 @@ package com.typesafe.netty.probe; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Publisher; public class PublisherProbe extends Probe implements Publisher { diff --git a/netty-reactive-streams/src/test/java/com/typesafe/netty/probe/SubscriberProbe.java b/netty-reactive-streams/src/test/java/com/typesafe/netty/probe/SubscriberProbe.java index 85a07c1e..e70a2320 100644 --- a/netty-reactive-streams/src/test/java/com/typesafe/netty/probe/SubscriberProbe.java +++ b/netty-reactive-streams/src/test/java/com/typesafe/netty/probe/SubscriberProbe.java @@ -1,7 +1,7 @@ package com.typesafe.netty.probe; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; public class SubscriberProbe extends Probe implements Subscriber { diff --git a/pom.xml b/pom.xml index 0c694123..3f877d53 100644 --- a/pom.xml +++ b/pom.xml @@ -1,10 +1,11 @@ - + 4.0.0 com.typesafe.netty netty-reactive-streams-parent - 2.1.0-SNAPSHOT + 3.0.0-SNAPSHOT Netty Reactive Streams Parent POM Reactive streams implementation for Netty. @@ -59,19 +60,20 @@ org.reactivestreams - reactive-streams + reactive-streams-tck-flow ${reactive-streams.version} + test org.reactivestreams - reactive-streams-tck - ${reactive-streams.version} + reactive-streams-flow-adapters + 1.0.2 test org.testng testng - 7.5 + 7.7.1 test @@ -89,67 +91,67 @@ 2.6.20 5.1.8 3.3.0 + UTF-8 - - - org.apache.maven.plugins - maven-compiler-plugin - 3.10.1 - - 1.7 - 1.7 - - - - org.sonatype.plugins - nexus-staging-maven-plugin - 1.6.13 - true - - ossrh - https://oss.sonatype.org/ - true - - - - org.apache.maven.plugins - maven-release-plugin - 2.5.3 - - true - false - release - deploy - - - - org.apache.maven.plugins - maven-jar-plugin - ${maven-jar-plugin.version} - - - ${project.build.outputDirectory}/META-INF/MANIFEST.MF - - - - - org.apache.felix - maven-bundle-plugin - ${maven-bundle-plugin.version} - - - bundle-manifest - process-classes - - manifest - - - - - - + + + org.apache.maven.plugins + maven-compiler-plugin + 3.10.1 + + 9 + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.13 + true + + ossrh + https://oss.sonatype.org/ + true + + + + org.apache.maven.plugins + maven-release-plugin + 2.5.3 + + true + false + release + deploy + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + ${project.build.outputDirectory}/META-INF/MANIFEST.MF + + + + + org.apache.felix + maven-bundle-plugin + ${maven-bundle-plugin.version} + + + bundle-manifest + process-classes + + manifest + + + + + + From e344d0d5b11b1cdbacc878a3d5dfbe706806dadd Mon Sep 17 00:00:00 2001 From: Erlend Hamnaberg Date: Fri, 10 Feb 2023 11:09:45 +0100 Subject: [PATCH 2/2] Drop JDK 8 from build --- .github/workflows/build-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index d2cdc46f..cc953798 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [ '8', '11', '17' ] + java: [ '11', '17' ] os: [ 'ubuntu-latest' ] runs-on: ${{ matrix.os }} steps: