|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import sys |
| 4 | +from functools import partial |
| 5 | +from typing import TYPE_CHECKING, ParamSpec, TypeVar, cast |
| 6 | + |
| 7 | +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor |
| 8 | +from opentelemetry.instrumentation.utils import unwrap |
| 9 | +from wrapt import register_post_import_hook, wrap_function_wrapper |
| 10 | + |
| 11 | +if TYPE_CHECKING: |
| 12 | + from collections.abc import Callable, Collection |
| 13 | + from types import ModuleType |
| 14 | + |
| 15 | + from opentelemetry.metrics import MeterProvider |
| 16 | + from opentelemetry.trace import TracerProvider |
| 17 | + |
| 18 | + from connectrpc.interceptor import Interceptor, InterceptorSync |
| 19 | + |
| 20 | +_instruments = ("connect-python>=0.9.0",) |
| 21 | + |
| 22 | +P = ParamSpec("P") |
| 23 | +R = TypeVar("R") |
| 24 | + |
| 25 | + |
| 26 | +class ConnectInstrumentor(BaseInstrumentor): |
| 27 | + def instrumentation_dependencies(self) -> Collection[str]: |
| 28 | + return _instruments |
| 29 | + |
| 30 | + def _instrument(self, **kwargs: object) -> None: |
| 31 | + self._meter_provider = cast( |
| 32 | + "MeterProvider | None", kwargs.get("meter_provider") |
| 33 | + ) |
| 34 | + self._tracer_provider = cast( |
| 35 | + "TracerProvider | None", kwargs.get("tracer_provider") |
| 36 | + ) |
| 37 | + |
| 38 | + register_post_import_hook(self._patch_client, "connectrpc.client") |
| 39 | + register_post_import_hook(self._patch_server, "connectrpc.server") |
| 40 | + |
| 41 | + def _uninstrument(self, **kwargs: object) -> None: |
| 42 | + # TODO: Remove sys.modules check after |
| 43 | + # https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4321 |
| 44 | + if "connectrpc.client" in sys.modules: |
| 45 | + unwrap("connectrpc.client.ConnectClient", "__init__") |
| 46 | + unwrap("connectrpc.client.ConnectClientSync", "__init__") |
| 47 | + if "connectrpc.server" in sys.modules: |
| 48 | + unwrap("connectrpc.server.ConnectASGIApplication", "__init__") |
| 49 | + unwrap("connectrpc.server.ConnectWSGIApplication", "__init__") |
| 50 | + |
| 51 | + def _patch_client(self, module: ModuleType) -> None: |
| 52 | + wrap_function_wrapper( |
| 53 | + module, "ConnectClient.__init__", partial(self._init_wrapper, client=True) |
| 54 | + ) |
| 55 | + wrap_function_wrapper( |
| 56 | + module, |
| 57 | + "ConnectClientSync.__init__", |
| 58 | + partial(self._init_wrapper, client=True), |
| 59 | + ) |
| 60 | + |
| 61 | + def _patch_server(self, module: ModuleType) -> None: |
| 62 | + wrap_function_wrapper( |
| 63 | + module, |
| 64 | + "ConnectASGIApplication.__init__", |
| 65 | + partial(self._init_wrapper, client=False), |
| 66 | + ) |
| 67 | + wrap_function_wrapper( |
| 68 | + module, |
| 69 | + "ConnectWSGIApplication.__init__", |
| 70 | + partial(self._init_wrapper, client=False), |
| 71 | + ) |
| 72 | + |
| 73 | + def _init_wrapper( |
| 74 | + self, |
| 75 | + wrapped: Callable[P, R], |
| 76 | + _instance: object, |
| 77 | + args: tuple, |
| 78 | + kwargs: dict, |
| 79 | + *, |
| 80 | + client: bool, |
| 81 | + ) -> R: |
| 82 | + # Instrumentation doesn't eager import the library it instruments |
| 83 | + from connectrpc_otel import OpenTelemetryInterceptor # noqa: PLC0415 |
| 84 | + |
| 85 | + interceptors: list[Interceptor | InterceptorSync] | None = kwargs.get( |
| 86 | + "interceptors" |
| 87 | + ) |
| 88 | + interceptors = [] if interceptors is None else [*interceptors] |
| 89 | + if any(isinstance(i, OpenTelemetryInterceptor) for i in interceptors): |
| 90 | + return wrapped(*args, **kwargs) |
| 91 | + |
| 92 | + kwargs["interceptors"] = interceptors |
| 93 | + # OpenTelemetry interceptor should be first so i.e. logging interceptors |
| 94 | + # have trace IDs. |
| 95 | + interceptors.insert( |
| 96 | + 0, |
| 97 | + OpenTelemetryInterceptor( |
| 98 | + tracer_provider=self._tracer_provider, |
| 99 | + meter_provider=self._meter_provider, |
| 100 | + client=client, |
| 101 | + ), |
| 102 | + ) |
| 103 | + |
| 104 | + return wrapped(*args, **kwargs) |
0 commit comments