Skip to content

Commit e6d572d

Browse files
authored
Implement OpenTelemetry metrics (#160)
Signed-off-by: Anuraag Agrawal <anuraaga@gmail.com>
1 parent 83641c3 commit e6d572d

5 files changed

Lines changed: 285 additions & 126 deletions

File tree

connectrpc-otel/connectrpc_otel/_interceptor.py

Lines changed: 77 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from __future__ import annotations
22

3+
import time
34
from contextlib import AbstractContextManager, contextmanager
45
from typing import TYPE_CHECKING, TypeAlias, TypeVar, cast
56

7+
from opentelemetry.metrics import MeterProvider, get_meter_provider
68
from opentelemetry.propagate import get_global_textmap
79
from opentelemetry.propagators.textmap import Setter, TextMapPropagator, default_setter
810
from opentelemetry.trace import (
@@ -12,15 +14,18 @@
1214
get_current_span,
1315
get_tracer_provider,
1416
)
17+
from opentelemetry.util.types import AttributeValue
1518

1619
from connectrpc.errors import ConnectError
1720

1821
from ._semconv import (
1922
CLIENT_ADDRESS,
2023
CLIENT_PORT,
2124
ERROR_TYPE,
25+
RPC_CLIENT_CALL_DURATION,
2226
RPC_METHOD,
2327
RPC_RESPONSE_STATUS_CODE,
28+
RPC_SERVER_CALL_DURATION,
2429
RPC_SYSTEM_NAME,
2530
SERVER_ADDRESS,
2631
SERVER_PORT,
@@ -31,14 +36,12 @@
3136
if TYPE_CHECKING:
3237
from collections.abc import Iterator, MutableMapping
3338

34-
from opentelemetry.util.types import AttributeValue
35-
3639
from connectrpc.request import RequestContext
3740

3841
REQ = TypeVar("REQ")
3942
RES = TypeVar("RES")
4043

41-
Token: TypeAlias = tuple[AbstractContextManager, Span]
44+
Token: TypeAlias = tuple[AbstractContextManager, Span, float, dict[str, AttributeValue]]
4245

4346
# Workaround bad typing
4447
_DEFAULT_TEXTMAP_SETTER = cast("Setter[MutableMapping[str, str]]", default_setter)
@@ -52,6 +55,7 @@ def __init__(
5255
*,
5356
propagator: TextMapPropagator | None = None,
5457
tracer_provider: TracerProvider | None = None,
58+
meter_provider: MeterProvider | None = None,
5559
client: bool = False,
5660
) -> None:
5761
"""Creates a new OpenTelemetry interceptor.
@@ -68,13 +72,51 @@ def __init__(
6872
self._tracer = tracer_provider.get_tracer("connectrpc-otel", __version__)
6973
self._propagator = propagator or get_global_textmap()
7074

75+
meter_provider = meter_provider or get_meter_provider()
76+
meter = meter_provider.get_meter("connectrpc-otel", __version__)
77+
78+
self._call_duration = meter.create_histogram(
79+
name=(RPC_CLIENT_CALL_DURATION if client else RPC_SERVER_CALL_DURATION),
80+
description=f"Measures the duration of an {'outgoing' if client else 'incoming'} Remote Procedure Call (RPC)",
81+
unit="s",
82+
explicit_bucket_boundaries_advisory=[
83+
0.005,
84+
0.01,
85+
0.025,
86+
0.05,
87+
0.075,
88+
0.1,
89+
0.25,
90+
0.5,
91+
0.75,
92+
1,
93+
2.5,
94+
5,
95+
7.5,
96+
10,
97+
],
98+
)
99+
71100
async def on_start(self, ctx: RequestContext) -> Token:
72101
return self.on_start_sync(ctx)
73102

74103
def on_start_sync(self, ctx: RequestContext) -> Token:
75-
cm = self._start_span(ctx)
104+
start_time = time.perf_counter()
105+
106+
rpc_method = f"{ctx.method().service_name}/{ctx.method().name}"
107+
shared_attrs: dict[str, AttributeValue] = {
108+
RPC_SYSTEM_NAME: RpcSystemNameValues.CONNECTRPC.value,
109+
RPC_METHOD: rpc_method,
110+
}
111+
112+
if sa := ctx.server_address():
113+
addr, port = sa.rsplit(":", 1)
114+
shared_attrs[SERVER_ADDRESS] = addr
115+
shared_attrs[SERVER_PORT] = int(port)
116+
117+
cm = self._start_span(ctx, rpc_method, shared_attrs)
76118
span = cm.__enter__()
77-
return cm, span
119+
return cm, span, start_time, shared_attrs
78120

79121
async def on_end(
80122
self, token: Token, ctx: RequestContext, error: Exception | None
@@ -84,15 +126,28 @@ async def on_end(
84126
def on_end_sync(
85127
self, token: Token, ctx: RequestContext, error: Exception | None
86128
) -> None:
87-
cm, span = token
88-
self._finish_span(span, error)
129+
cm, span, start_time, shared_attrs = token
130+
end_time = time.perf_counter()
131+
error_attrs = self._get_error_attributes(error)
132+
if error_attrs:
133+
span.set_attributes(error_attrs)
134+
# Won't use shared_attrs anymore, no need to copy.
135+
metric_attrs = shared_attrs
136+
if error_attrs:
137+
metric_attrs.update(error_attrs)
138+
self._call_duration.record(end_time - start_time, metric_attrs)
89139
if error:
90140
cm.__exit__(type(error), error, error.__traceback__)
91141
else:
92142
cm.__exit__(None, None, None)
93143

94144
@contextmanager
95-
def _start_span(self, ctx: RequestContext) -> Iterator[Span]:
145+
def _start_span(
146+
self,
147+
ctx: RequestContext,
148+
span_name: str,
149+
shared_attrs: dict[str, AttributeValue],
150+
) -> Iterator[Span]:
96151
parent_otel_ctx = None
97152
if self._client:
98153
span_kind = SpanKind.CLIENT
@@ -105,30 +160,27 @@ def _start_span(self, ctx: RequestContext) -> Iterator[Span]:
105160
carrier = ctx.request_headers()
106161
parent_otel_ctx = self._propagator.extract(carrier)
107162

108-
rpc_method = f"{ctx.method().service_name}/{ctx.method().name}"
163+
attrs: dict[str, AttributeValue] = shared_attrs.copy()
109164

110-
attrs: MutableMapping[str, AttributeValue] = {
111-
RPC_SYSTEM_NAME: RpcSystemNameValues.CONNECTRPC.value,
112-
RPC_METHOD: rpc_method,
113-
}
114-
if sa := ctx.server_address():
115-
addr, port = sa.rsplit(":", 1)
116-
attrs[SERVER_ADDRESS] = addr
117-
attrs[SERVER_PORT] = int(port)
118165
if ca := ctx.client_address():
119166
addr, port = ca.rsplit(":", 1)
120167
attrs[CLIENT_ADDRESS] = addr
121168
attrs[CLIENT_PORT] = int(port)
122169

123170
with self._tracer.start_as_current_span(
124-
rpc_method, kind=span_kind, attributes=attrs, context=parent_otel_ctx
171+
span_name, kind=span_kind, attributes=attrs, context=parent_otel_ctx
125172
) as span:
126173
yield span
127174

128-
def _finish_span(self, span: Span, error: Exception | None) -> None:
129-
if error:
130-
if isinstance(error, ConnectError):
131-
span.set_attribute(RPC_RESPONSE_STATUS_CODE, error.code.value)
132-
else:
133-
span.set_attribute(RPC_RESPONSE_STATUS_CODE, "unknown")
134-
span.set_attribute(ERROR_TYPE, type(error).__qualname__)
175+
def _get_error_attributes(
176+
self, error: Exception | None
177+
) -> dict[str, AttributeValue] | None:
178+
if not error:
179+
return None
180+
181+
return {
182+
ERROR_TYPE: type(error).__qualname__,
183+
RPC_RESPONSE_STATUS_CODE: error.code.value
184+
if isinstance(error, ConnectError)
185+
else "unknown",
186+
}

connectrpc-otel/connectrpc_otel/_semconv.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
CLIENT_ADDRESS: Final = "client.address"
1212
CLIENT_PORT: Final = "client.port"
1313
ERROR_TYPE: Final = "error.type"
14+
RPC_CLIENT_CALL_DURATION: Final = "rpc.client.call.duration"
15+
RPC_SERVER_CALL_DURATION: Final = "rpc.server.call.duration"
1416
RPC_METHOD: Final = "rpc.method"
1517
RPC_RESPONSE_STATUS_CODE: Final = "rpc.response.status_code"
1618
RPC_SYSTEM_NAME: Final = "rpc.system.name"

0 commit comments

Comments
 (0)