Skip to content

feat(Instrumentation/Messenger): implement worker and middleware instrumentation#173

Open
jvocampings wants to merge 27 commits intoFriendsOfOpenTelemetry:mainfrom
jvocampings:messenger-instrumentation
Open

feat(Instrumentation/Messenger): implement worker and middleware instrumentation#173
jvocampings wants to merge 27 commits intoFriendsOfOpenTelemetry:mainfrom
jvocampings:messenger-instrumentation

Conversation

@jvocampings
Copy link
Copy Markdown

@jvocampings jvocampings commented Apr 17, 2025

WIP.

Related to issue #172

Encapsulate consumption of message in a Span. Based on Messenger events.

Missing instrumentation when a message is dispatched. Not sure how to implement it.

edit: I assume we could reuse TraceableMessengerTransport to decorate the method send.

@jvocampings jvocampings requested a review from a team as a code owner April 17, 2025 15:35
Comment thread src/Instrumentation/Symfony/Messenger/EventSubscriber/EndSpanEventSubscriber.php Outdated
Comment thread src/Instrumentation/Symfony/Messenger/EventSubscriber/EndSpanEventSubscriber.php Outdated
Comment thread src/Instrumentation/Symfony/Console/TraceableConsoleEventSubscriber.php Outdated
Comment thread src/Instrumentation/Symfony/Messenger/EventSubscriber/EndSpanEventSubscriber.php Outdated
Comment thread src/Instrumentation/Symfony/Messenger/EventSubscriber/EndSpanEventSubscriber.php Outdated
@gaelreyrol
Copy link
Copy Markdown
Contributor

gaelreyrol commented Apr 18, 2025

edit: I assume we could reuse TraceableMessengerTransport to decorate the method send.
You mean attach the trace context to the message?

Thanks again for your preliminary work on this, it's not an easy part and it will take some time to get the right implementation!

This PR might also close this issue #168.

@jvocampings jvocampings changed the title chore(messenger): use Messenger events to start/end span for instrumentation chore(messenger): use Messenger instrumentation Apr 18, 2025
@jvocampings
Copy link
Copy Markdown
Author

edit: I assume we could reuse TraceableMessengerTransport to decorate the method send.
You mean attach the trace context to the message?

Ah I was talking about Instrumentation and not Propagation.
When I dispatch a message which is consumed asynchronously, I'd like to create a span to have a trace of it and to see how many time it took to send messages to my broker (rabbitmq, sqs etc)

@jvocampings
Copy link
Copy Markdown
Author

jvocampings commented Apr 18, 2025

@gaelreyrol I made some modifications related to what you asked. I'll be on holidays for 2 weeks. After that there are big changes at my work then I don't know if I'll be able to reactive. I'll do my best.

@gaelreyrol
Copy link
Copy Markdown
Contributor

Ah I was talking about Instrumentation and not Propagation. When I dispatch a message which is consumed asynchronously, I'd like to create a span to have a trace of it and to see how many time it took to send messages to my broker (rabbitmq, sqs etc)

It can't be done from the event and the TraceableMessengerTransport is already doing that, you just need to wrap the transport DNS with the trace() decorator. Check out this file: https://github.com/FriendsOfOpenTelemetry/opentelemetry-bundle/blob/main/tests/Functional/Application/config/packages/framework.yaml#L14

I made some modifications related to what you asked. I'll be on holiday for 2 weeks. After that there are big changes at my work then I don't know if I'll be able to reactive. I'll do my best.

Thank you very much, don't worry at all, have a good holiday! I might work on your PR directly if I have enough time.

Comment thread src/Instrumentation/Symfony/Messenger/InstrumentationEventSubscriber.php Outdated
Comment thread src/Instrumentation/Symfony/Messenger/InstrumentationEventSubscriber.php Outdated
jvocampings and others added 7 commits April 10, 2026 16:25
Fix several issues in the WorkerMessageEventSubscriber introduced by PR FriendsOfOpenTelemetry#173:
- Replace SDK Span import with API Span to respect API/SDK separation
- Implement InstrumentationTypeInterface for consistency with other subscribers
- Add event priorities (10000/-10000) to wrap all other processing
- Add messaging semantic convention attributes (operation.type, destination.name)
- Include message class name in span name for better trace readability
- Remove stale imports and duplicate propagation middleware service definition
- Clean up propagation middleware when messenger tracing is disabled
- Add PHPStan baseline entries for untyped $carrier interface params
- Add functional tests for worker message handled, failed, and attribute mode
- Reorganize messenger tests into Messenger/ subdirectory
- Disable retry on test transport to isolate worker span assertions
…leware

Cover TraceableMessengerTransport (get/ack/reject spans + TransportException
error recording) and AddStampForPropagationMiddleware (stamp skip, no-scope
passthrough, active-scope injection).
@gaelreyrol gaelreyrol changed the title chore(messenger): use Messenger instrumentation feat(Instrumentation/Messenger): implement worker and middleware instrumentation Apr 12, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 12, 2026

Codecov Report

❌ Patch coverage is 98.08612% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 92.32%. Comparing base (b628f4d) to head (66387cb).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...tion/Symfony/Messenger/TraceableMessengerStack.php 88.88% 4 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #173      +/-   ##
============================================
+ Coverage     91.21%   92.32%   +1.11%     
- Complexity      692      741      +49     
============================================
  Files           116      121       +5     
  Lines          2789     2971     +182     
============================================
+ Hits           2544     2743     +199     
+ Misses          245      228      -17     
Flag Coverage Δ
phpunit 92.32% <98.08%> (+1.11%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Align WorkerMessageEventSubscriber.endSpanOnError() with every other
subscriber in the codebase (HttpKernel, Console, Doctrine, Mailer,
TransportTracer) by using $span->recordException() which creates a
proper OTel exception event with exception.type, exception.message,
and exception.stacktrace per the OTel semantic conventions.

Update tests to assert on span events rather than span attributes.
When Symfony Messenger handlers fail, the exception is wrapped in a
HandlerFailedException (or other WrappedExceptionsInterface implementors
like DelayedMessageHandlingException). Without unwrapping, the span
records the generic wrapper message instead of the actual root causes.

Now each nested exception is recorded as a separate span event via
recordException(), matching how Sentry handles these wrapped exceptions.
Replace the hardcoded $notSupportedCommands array and isNotSupported()
method with a default value on the existing exclude_commands config node.

messenger:consume is now the default entry in console.tracing.exclude_commands,
following the same pattern as Sentry's configurable excluded_commands. Users
can override the list or clear it entirely via configuration.
Cover all branches of set(), get(), and keys() methods:
- Valid envelope carrier for set/get/keys
- Invalid carrier throws InvalidArgumentException
- Non-traceparent key is ignored (set) or returns null (get)
- Missing TraceStamp returns null from get()

Closes the 34.78% patch coverage gap reported by Codecov.
The W3C Trace Context spec defines two headers: traceparent and
tracestate. Previously only traceparent was propagated, silently
dropping vendor-specific trace context (Datadog, AWS X-Ray, etc.).

Add optional traceState property to TraceStamp and update
TraceStampPropagator to handle both keys in set(), get(), and keys().
When tracestate arrives after traceparent, the existing stamp is
replaced with one carrying both values.
….name

bus.name is not a standard OTel semantic convention. Rename to
symfony.messenger.bus.name to follow the existing codebase pattern
(symfony.console.*, symfony.kernel.*).

Only changes the worker subscriber attribute; the pre-existing
bus.name in TraceableMessengerStack is left unchanged (out of scope).
If startSpan fires but neither endSpanWithSuccess nor endSpanOnError
fires (e.g. worker killed, unhandled error in another subscriber),
the OTel context scope leaks into subsequent messages.

Now startSpan checks for a lingering scope at the beginning of each
message and cleans it up: detaches the scope, marks the orphaned span
as ERROR, and ends it. This prevents context pollution across messages.
…namespace span attributes

- Namespace middleware span attributes (bus.name, event.category, event.current)
  under symfony.messenger.* prefix for consistency with worker subscriber
- Add symfony.messenger.will_retry and symfony.messenger.retry_count attributes
  to failure spans so operators can distinguish retriable from terminal failures
- Support #[Traceable] attribute on message classes for attribute-based
  instrumentation mode, with custom tracer selection via TracerLocatorPass
…erage

Mark all new types as final for consistency with existing codebase.
Register TraceStampPropagator as a shared DI service instead of inline
construction. Default instrumentationType to Auto, add safe tracer
locator fallback, nullable logger consistency, debug logging on skipped
messages and null scopes, and remove dead commented-out code. Add
end-to-end propagation test, WorkerMessageEventSubscriber unit tests,
and PropagatorFactory test.
Symfony's getHeaders() normalizes header names to lowercase, so the
Content-Length check was unreachable. Access the first array element
since headers are returned as arrays.
…leware spans

TraceableMessengerStack now tracks its own scope, span, and parent context
instead of probing global context storage — fixing scope leaks where earlier
middleware spans were never ended. TraceableMessengerMiddleware catches
exceptions and records them with STATUS_ERROR on the active span, aligning
with every other instrumentation in the bundle.
TraceableMessengerTransportFactory was tagged `kernel.reset` with
method `reset`, but the class has no such method. Symfony's
ServicesResetter invokes the declared method on every tagged service
between worker iterations, so `messenger:consume` threw as soon as a
retry (or any post-handler reset) ran:

    Attempted to call an undefined method named "reset" of class
    "...\TraceableMessengerTransportFactory".

The tag was introduced with the original messenger tracing (6f3c032)
and never had a corresponding method — likely speculative, possibly
confused with messenger transports that sometimes implement
ResetInterface.

Removal is preferred over adding an empty reset(): the factory's
dependencies (inner TransportFactory, TracerInterface, LoggerInterface)
are immutable DI services, createTransport() returns a fresh
TraceableMessengerTransport without retaining a reference, and
supports() is pure — there is no per-message state to clear. An empty
reset() would silence the error but imply a ResetInterface-style
contract the class does not fulfill and mislead anyone later adding
mutable state.
@gaelreyrol
Copy link
Copy Markdown
Contributor

gaelreyrol commented Apr 18, 2026

Hi @jvocampings — quick update on where the PR stands. cc @kochen (ref #190) — this should cover your use case too; would love your feedback.

Review feedback addressed

  • Merged StartSpan/EndSpan subscribers into a single WorkerMessageEventSubscriber at the root of the Messenger folder
  • Removed strict_types throughout src/
  • Span lifecycle now hooks WorkerMessageReceivedEvent (start) and WorkerMessageHandledEvent/WorkerMessageFailedEvent (end)
  • Error path ends the span and records the exception via recordException(); HandlerFailedException is unwrapped so the root cause surfaces in traces

Middleware & propagation

  • Kept the middleware per the sync/async flow we discussed — it creates spans in both paths so dispatch and handling phases are visible
  • Trace context is propagated through a dedicated stamp carrying both traceparent and tracestate (no longer AMQP-specific)
  • Explicit scope tracking with leak safety; errors are recorded on middleware spans too

Attributes, config, tests

  • Span attributes namespaced as symfony.messenger.*
  • messenger:consume exclusion is now configurable (Console instrumentation)
  • Added functional tests for the worker subscriber and unit tests for TraceStampPropagator, transport tracing, and the propagation middleware
  • Drive-by: fixed an erroneous kernel.reset tag on the transport factory and a lowercase Content-Length lookup in HttpClient

@jvocampings — could you give the branch a spin on your setup to confirm nothing regressed vs. your original intent? And @kochen, if you're able to test against your PDF-generator flow, that would be a great real-world validation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants