Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
build:
strategy:
matrix:
java: [ '8', '11', '17' ]
java: [ '11', '17' ]
os: [ 'ubuntu-latest' ]
runs-on: ${{ matrix.os }}
steps:
Expand Down
6 changes: 3 additions & 3 deletions netty-reactive-streams-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.typesafe.netty</groupId>
<artifactId>netty-reactive-streams-parent</artifactId>
<version>2.1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
</parent>

<artifactId>netty-reactive-streams-http</artifactId>
Expand All @@ -25,7 +25,7 @@
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<artifactId>reactive-streams-tck-flow</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
Expand All @@ -51,7 +51,7 @@
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.resources[0].directory}/META-INF</outputDirectory>
<outputDirectory>${project.build.outputDirectory}/META-INF</outputDirectory>
<resources>
<resource>
<directory>../</directory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;

/**
* A default streamed HTTP request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;

/**
* A default streamed HTTP response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

/**
* A default WebSocket HTTP response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;

final class DelegateStreamedHttpRequest extends DelegateHttpRequest implements StreamedHttpRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;

final class DelegateStreamedHttpResponse extends DelegateHttpResponse implements StreamedHttpResponse {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.*;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

/**
* Handler that converts written {@link StreamedHttpRequest} messages into {@link HttpRequest} messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;

import java.util.LinkedList;
import java.util.Queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import org.reactivestreams.Publisher;
import java.util.concurrent.Flow.Publisher;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMessage;
import org.reactivestreams.Publisher;
import java.util.concurrent.Flow.Publisher;

/**
* Combines {@link HttpMessage} and {@link Publisher} into one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import org.reactivestreams.Processor;
import java.util.concurrent.Flow.Processor;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,26 @@
import akka.japi.function.Creator;
import akka.stream.Materializer;
import akka.stream.javadsl.*;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.FlowAdapters;
import java.util.concurrent.Flow.Processor;

public class AkkaStreamsUtil {

public static <In, Out> Processor<In, Out> flowToProcessor(Flow<In, Out, ?> flow, Materializer materializer) {
Pair<Subscriber<In>, Publisher<Out>> pair =
Pair<org.reactivestreams.Subscriber<In>, org.reactivestreams.Publisher<Out>> pair =
Source.<In>asSubscriber()
.via(flow)
.toMat(Sink.<Out>asPublisher(AsPublisher.WITH_FANOUT), Keep.<Subscriber<In>, Publisher<Out>>both())
.toMat(Sink.<Out>asPublisher(AsPublisher.WITH_FANOUT), Keep.both())
.run(materializer);

return new DelegateProcessor<>(pair.first(), pair.second());
return new DelegateProcessor<>(FlowAdapters.toFlowSubscriber(pair.first()), FlowAdapters.toFlowPublisher(pair.second()));
}

public static <In, Out> Flow<In, Out, ?> processorToFlow(final Processor<In, Out> processor) {
return Flow.fromProcessor(new Creator<Processor<In, Out>>() {
return Flow.fromProcessor(new Creator<org.reactivestreams.Processor<In, Out>>() {
@Override
public Processor<In, Out> create() throws Exception {
return processor;
public org.reactivestreams.Processor<In, Out> create() throws Exception {
return FlowAdapters.toProcessor(processor);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.typesafe.netty.http;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class DelegateProcessor<In, Out> implements Processor<In, Out> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http.*;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.tck.IdentityProcessorVerification;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.tck.flow.IdentityFlowProcessorVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.*;
import scala.concurrent.Await;
Expand All @@ -24,21 +23,24 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Publisher;


/**
* This identity processor verification verifies a client making requests to a server, that echos the
* body back.
*
* <p>
* The server uses the {@link HttpStreamsServerHandler}, and then exposes the messages sent/received by
* that using reactive streams. So it effectively uses streams of streams. It then uses Akka streams
* to actually handle the requests, echoing the bodies back in the responses as is.
*
* <p>
* The client uses the {@link HttpStreamsClientHandler}, and then exposes the messages sent/received by
* that using reactive streams, so it too is effectively a stream of streams. Here Akka streams is used
* to split the String bodies into many chunks, for more interesting verification of the bodies, and then
* combines all the chunks together back into a String at the end.
*/
public class FullStackHttpIdentityProcessorVerificationTest extends IdentityProcessorVerification<String> {
public class FullStackHttpIdentityProcessorVerificationTest extends IdentityFlowProcessorVerification<String> {

private NioEventLoopGroup eventLoop;
private Channel serverBindChannel;
Expand Down Expand Up @@ -86,7 +88,7 @@ public void stop() throws Exception {
}

@Override
public Processor<String, String> createIdentityProcessor(int bufferSize) {
public Processor<String, String> createIdentityFlowProcessor(int bufferSize) {

ProcessorHttpClient client = new ProcessorHttpClient(eventLoop);
Processor<HttpRequest, HttpResponse> connection = getProcessor(client);
Expand All @@ -98,7 +100,7 @@ public Processor<String, String> createIdentityProcessor(int bufferSize) {
public HttpRequest apply(String body) throws Exception {
List<String> content = new ArrayList<>();
String[] chunks = body.split(":");
for (String chunk: chunks) {
for (String chunk : chunks) {
// Make sure we put the ":" back into the body
String c;
if (content.isEmpty()) {
Expand Down Expand Up @@ -133,9 +135,10 @@ private Processor<HttpRequest, HttpResponse> getProcessor(ProcessorHttpClient cl
}

@Override
public Publisher<String> createFailedPublisher() {
return Source.<String>failed(new RuntimeException("failed"))
.toMat(Sink.<String>asPublisher(AsPublisher.WITH_FANOUT), Keep.<NotUsed, Publisher<String>>right()).run(materializer);
public Publisher<String> createFailedFlowPublisher() {
return FlowAdapters.toFlowPublisher(
Source.<String>failed(new RuntimeException("failed"))
.toMat(Sink.<String>asPublisher(AsPublisher.WITH_FANOUT), Keep.<NotUsed, org.reactivestreams.Publisher<String>>right()).run(materializer));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.*;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -72,12 +73,12 @@ public HttpResponse echo(Object msg) {

public StreamedHttpRequest createStreamedRequest(String method, String uri, List<String> body) {
List<HttpContent> content = new ArrayList<>();
for (String chunk: body) {
for (String chunk : body) {
content.add(new DefaultHttpContent(Unpooled.copiedBuffer(chunk, Charset.forName("utf-8"))));
}
Publisher<HttpContent> publisher = Source.from(content).runWith(Sink.<HttpContent>asPublisher(AsPublisher.WITH_FANOUT), materializer);
return new DefaultStreamedHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri,
publisher);
FlowAdapters.toFlowPublisher(publisher));
}

public StreamedHttpRequest createStreamedRequest(String method, String uri, List<String> body, long contentLength) {
Expand All @@ -101,11 +102,11 @@ public FullHttpResponse createFullResponse(String body) {

public StreamedHttpResponse createStreamedResponse(HttpVersion version, List<String> body, long contentLength) {
List<HttpContent> content = new ArrayList<>();
for (String chunk: body) {
for (String chunk : body) {
content.add(new DefaultHttpContent(Unpooled.copiedBuffer(chunk, Charset.forName("utf-8"))));
}
Publisher<HttpContent> publisher = Source.from(content).runWith(Sink.<HttpContent>asPublisher(AsPublisher.WITH_FANOUT), materializer);
StreamedHttpResponse response = new DefaultStreamedHttpResponse(version, HttpResponseStatus.OK, publisher);
StreamedHttpResponse response = new DefaultStreamedHttpResponse(version, HttpResponseStatus.OK, FlowAdapters.toFlowPublisher(publisher));
HttpUtil.setContentLength(response, contentLength);
return response;
}
Expand All @@ -119,7 +120,7 @@ public CompletionStage<String> extractBodyAsync(Object msg) {
String body = contentAsString((FullHttpMessage) msg);
return CompletableFuture.completedFuture(body);
} else if (msg instanceof StreamedHttpMessage) {
return Source.fromPublisher((StreamedHttpMessage) msg).runFold("", new Function2<String, HttpContent, String>() {
return Source.fromPublisher(FlowAdapters.toPublisher((StreamedHttpMessage) msg)).runFold("", new Function2<String, HttpContent, String>() {
@Override
public String apply(String body, HttpContent content) throws Exception {
return body + contentAsString(content);
Expand Down Expand Up @@ -156,7 +157,7 @@ public boolean hasRequestContentLength(HttpResponse response) {

public void cancelStreamedMessage(Object msg) {
if (msg instanceof StreamedHttpMessage) {
Source.fromPublisher((StreamedHttpMessage) msg).runWith(Sink.<HttpContent>cancelled(), materializer);
Source.fromPublisher(FlowAdapters.toPublisher((StreamedHttpMessage) msg)).runWith(Sink.<HttpContent>cancelled(), materializer);
} else {
throw new IllegalArgumentException("Unknown message type: " + msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import org.reactivestreams.Processor;
import org.reactivestreams.FlowAdapters;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand All @@ -30,6 +30,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Flow.Processor;

import static org.testng.Assert.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.concurrent.*;
import org.reactivestreams.Processor;
import java.util.concurrent.Flow.Processor;

import java.net.SocketAddress;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
import org.reactivestreams.Processor;

import java.net.SocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.Flow.Processor;

public class ProcessorHttpServer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Processor;
import org.reactivestreams.FlowAdapters;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -73,7 +74,7 @@ public WebSocketFrame apply(WebSocketFrame msg) throws Exception {
}).toProcessor().run(materializer);

ctx.writeAndFlush(new DefaultWebSocketHttpResponse(request.protocolVersion(),
HttpResponseStatus.valueOf(200), processor,
HttpResponseStatus.valueOf(200), FlowAdapters.toFlowProcessor(processor),
new WebSocketServerHandshakerFactory("ws://127.0.0.1/" + port + "/", null, withExtensions)
));
}
Expand Down Expand Up @@ -139,7 +140,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
Processor<WebSocketFrame, WebSocketFrame> processor = Flow.<WebSocketFrame>create().toProcessor().run(materializer);

ctx.writeAndFlush(new DefaultWebSocketHttpResponse(request.protocolVersion(),
HttpResponseStatus.valueOf(200), processor,
HttpResponseStatus.valueOf(200), FlowAdapters.toFlowProcessor(processor),
new WebSocketServerHandshakerFactory("ws://127.0.0.1/" + port + "/", null, false)
));
}
Expand Down
Loading