Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 50 additions & 28 deletions examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@

package io.dapr.examples.tracing;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.config.Properties;
import io.dapr.examples.OpenTelemetryConfig;
import io.dapr.utils.TypeRef;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.sdk.OpenTelemetrySdk;

import static io.dapr.examples.OpenTelemetryConfig.getReactorContext;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

/**
* 1. Build and install jars:
Expand All @@ -52,30 +54,50 @@ public static void main(String[] args) throws Exception {
Tracer tracer = openTelemetrySdk.getTracer(InvokeClient.class.getCanonicalName());
Span span = tracer.spanBuilder("Example's Main").setSpanKind(SpanKind.CLIENT).startSpan();

try (DaprClient client = (new DaprClientBuilder()).build()) {
for (String message : args) {
try (Scope scope = span.makeCurrent()) {
InvokeMethodRequest request = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_echo")
.setBody(message)
.setHttpExtension(HttpExtension.POST);
client.invokeMethod(request, TypeRef.get(byte[].class))
.map(r -> {
System.out.println(new String(r));
return r;
})
.flatMap(r -> {
InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(sleepRequest, TypeRef.get(Void.class));
}).contextWrite(getReactorContext()).block();
}
int port = Properties.HTTP_PORT.get();
String baseUrl = "http://localhost:" + port + "/v1.0/invoke/" + SERVICE_APP_ID + "/method/";

HttpClient httpClient = HttpClient.newHttpClient();

for (String message : args) {
try (Scope scope = span.makeCurrent()) {
// Call proxy_echo
HttpRequest.Builder echoBuilder = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "proxy_echo"))
.POST(HttpRequest.BodyPublishers.ofString(message));
injectTraceContext(echoBuilder);
addDaprApiToken(echoBuilder);
HttpResponse<byte[]> echoResponse =
httpClient.send(echoBuilder.build(), HttpResponse.BodyHandlers.ofByteArray());
System.out.println(new String(echoResponse.body()));

// Call proxy_sleep
HttpRequest.Builder sleepBuilder = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "proxy_sleep"))
.POST(HttpRequest.BodyPublishers.noBody());
injectTraceContext(sleepBuilder);
addDaprApiToken(sleepBuilder);
httpClient.send(sleepBuilder.build(), HttpResponse.BodyHandlers.discarding());
}
}

span.end();
openTelemetrySdk.getSdkTracerProvider().shutdown();
Validation.validate();
System.out.println("Done");
System.exit(0);
}

private static void injectTraceContext(HttpRequest.Builder builder) {
TextMapSetter<HttpRequest.Builder> setter = HttpRequest.Builder::header;
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.inject(Context.current(), builder, setter);
}

span.end();
openTelemetrySdk.getSdkTracerProvider().shutdown();
Validation.validate();
System.out.println("Done");
System.exit(0);
private static void addDaprApiToken(HttpRequest.Builder builder) {
String token = Properties.API_TOKEN.get();
if (token != null) {
builder.header("dapr-api-token", token);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@

package io.dapr.examples.tracing;

import io.dapr.client.DaprClient;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.config.Properties;
import io.dapr.examples.OpenTelemetryInterceptor;
import io.dapr.utils.TypeRef;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import org.springframework.beans.factory.annotation.Autowired;
import io.opentelemetry.context.propagation.TextMapSetter;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import static io.dapr.examples.OpenTelemetryConfig.getReactorContext;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

/**
* SpringBoot Controller to handle service invocation.
Expand All @@ -38,11 +39,7 @@ public class TracingDemoMiddleServiceController {

private static final String INVOKE_APP_ID = "tracingdemo";

/**
* Dapr client.
*/
@Autowired
private DaprClient client;
private static final HttpClient httpClient = HttpClient.newHttpClient();

/**
* Handles the 'echo' method invocation, by proxying a call into another service.
Expand All @@ -55,10 +52,14 @@ public class TracingDemoMiddleServiceController {
public Mono<byte[]> echo(
@RequestAttribute(name = "opentelemetry-context") Context context,
@RequestBody(required = false) String body) {
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo")
.setBody(body)
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context));
return Mono.fromFuture(() -> {
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(buildInvokeUrl("echo")))
.POST(HttpRequest.BodyPublishers.ofString(body != null ? body : ""));
injectTraceContext(builder, context);
addDaprApiToken(builder);
return httpClient.sendAsync(builder.build(), HttpResponse.BodyHandlers.ofByteArray());
}).map(HttpResponse::body);
}

/**
Expand All @@ -69,9 +70,32 @@ public Mono<byte[]> echo(
*/
@PostMapping(path = "/proxy_sleep")
public Mono<Void> sleep(@RequestAttribute(name = "opentelemetry-context") Context context) {
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then();
return Mono.fromFuture(() -> {
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(buildInvokeUrl("sleep")))
.POST(HttpRequest.BodyPublishers.noBody());
injectTraceContext(builder, context);
addDaprApiToken(builder);
return httpClient.sendAsync(builder.build(), HttpResponse.BodyHandlers.discarding());
}).then();
}

private static String buildInvokeUrl(String method) {
int port = Properties.HTTP_PORT.get();
return "http://localhost:" + port + "/v1.0/invoke/" + INVOKE_APP_ID + "/method/" + method;
}

private static void injectTraceContext(HttpRequest.Builder builder, Context context) {
TextMapSetter<HttpRequest.Builder> setter = HttpRequest.Builder::header;
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.inject(context, builder, setter);
}

private static void addDaprApiToken(HttpRequest.Builder builder) {
String token = Properties.API_TOKEN.get();
if (token != null) {
builder.header("dapr-api-token", token);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@SuppressWarnings("deprecation")
public class MethodInvokeIT extends BaseIT {

//Number of messages to be sent: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;

@SuppressWarnings("deprecation")
public class TracingIT extends BaseIT {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;

@SuppressWarnings("deprecation")
public class TracingIT extends BaseIT {

/**
Expand Down
10 changes: 10 additions & 0 deletions sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public Mono<Void> publishEvent(String pubsubName, String topicName, Object data,
* {@inheritDoc}
*/
@Override
@Deprecated
public <T> Mono<T> invokeMethod(
String appId,
String methodName,
Expand All @@ -136,6 +137,7 @@ public <T> Mono<T> invokeMethod(
* {@inheritDoc}
*/
@Override
@Deprecated
public <T> Mono<T> invokeMethod(
String appId,
String methodName,
Expand All @@ -150,6 +152,7 @@ public <T> Mono<T> invokeMethod(
* {@inheritDoc}
*/
@Override
@Deprecated
public <T> Mono<T> invokeMethod(
String appId, String methodName, HttpExtension httpExtension, Map<String, String> metadata, TypeRef<T> type) {
return this.invokeMethod(appId, methodName, null, httpExtension, metadata, type);
Expand All @@ -159,6 +162,7 @@ public <T> Mono<T> invokeMethod(
* {@inheritDoc}
*/
@Override
@Deprecated
public <T> Mono<T> invokeMethod(
String appId, String methodName, HttpExtension httpExtension, Map<String, String> metadata, Class<T> clazz) {
return this.invokeMethod(appId, methodName, null, httpExtension, metadata, TypeRef.get(clazz));
Expand All @@ -168,6 +172,7 @@ public <T> Mono<T> invokeMethod(
* {@inheritDoc}
*/
@Override
@Deprecated
public <T> Mono<T> invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
TypeRef<T> type) {
return this.invokeMethod(appId, methodName, request, httpExtension, null, type);
Expand All @@ -177,6 +182,7 @@ public <T> Mono<T> invokeMethod(String appId, String methodName, Object request,
* {@inheritDoc}
*/
@Override
@Deprecated
public <T> Mono<T> invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
Class<T> clazz) {
return this.invokeMethod(appId, methodName, request, httpExtension, null, TypeRef.get(clazz));
Expand All @@ -186,6 +192,7 @@ public <T> Mono<T> invokeMethod(String appId, String methodName, Object request,
* {@inheritDoc}
*/
@Override
@Deprecated
public Mono<Void> invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension) {
return this.invokeMethod(appId, methodName, request, httpExtension, null, TypeRef.BYTE_ARRAY).then();
}
Expand All @@ -194,6 +201,7 @@ public Mono<Void> invokeMethod(String appId, String methodName, Object request,
* {@inheritDoc}
*/
@Override
@Deprecated
public Mono<Void> invokeMethod(
String appId, String methodName, Object request, HttpExtension httpExtension, Map<String, String> metadata) {
return this.invokeMethod(appId, methodName, request, httpExtension, metadata, TypeRef.BYTE_ARRAY).then();
Expand All @@ -203,6 +211,7 @@ public Mono<Void> invokeMethod(
* {@inheritDoc}
*/
@Override
@Deprecated
public Mono<Void> invokeMethod(
String appId, String methodName, HttpExtension httpExtension, Map<String, String> metadata) {
return this.invokeMethod(appId, methodName, null, httpExtension, metadata, TypeRef.BYTE_ARRAY).then();
Expand All @@ -212,6 +221,7 @@ public Mono<Void> invokeMethod(
* {@inheritDoc}
*/
@Override
@Deprecated
public Mono<byte[]> invokeMethod(
String appId, String methodName, byte[] request, HttpExtension httpExtension, Map<String, String> metadata) {
return this.invokeMethod(appId, methodName, request, httpExtension, metadata, TypeRef.BYTE_ARRAY);
Expand Down
Loading
Loading