-
Notifications
You must be signed in to change notification settings - Fork 8
Added OTEL tracing #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
davidigandan
wants to merge
42
commits into
main
Choose a base branch
from
dev
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Added OTEL tracing #196
Changes from all commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
722bf3e
Add basic tracing middleware and global control
davidigandan 52cb04d
Instrument on subscribe and add dcid to span attributes
davidigandan cc9ee12
Add spanid and traceid metadata to greylog
davidigandan f7cc658
Add recipe_id to spans
davidigandan 8b2a2f1
Add dev and prod dependencies
davidigandan 0686e28
Remove dcid extract from message and inject to span logic. Will be ad…
davidigandan 2d9e21c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 3a5283a
Use plugin configurations to configure connection to OTELCollector
davidigandan 4b999f1
Remove vestigial dcid handling and unnecessary debug statements
davidigandan 4b86715
remove unhelpful docstring
davidigandan 9c13d07
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan 3e0b902
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] d446e80
imported OTEL config class to common_service
davidigandan 16b0e10
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 7ad857f
add marshmallow dependency
davidigandan 468f940
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan 7aae664
add zocalo dependency
davidigandan 902a7df
Fix possibly unbound error
davidigandan 9e5adb7
Moved plugin functionality to python-workflows
davidigandan 1d7457e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] ff3679c
Fixed typos, vestigial code and improper use of log_extender
davidigandan e6be925
Remove vestigial try block and fix runtime issue where None[] or None…
davidigandan aebe1ca
Remove vestigial try block and fix runtime issue where None[] or None…
davidigandan 77eb9e9
Implement ExitStack() to manage multiple context managers and clean t…
davidigandan 5f94077
Fix broken tracing functionality
davidigandan 786bd00
Fix
davidigandan 33fbce2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] ed6bc93
Fix rw.environment.get('ID') bug
davidigandan 2369ba1
Ensure environment and environment.id exists
davidigandan 554baa2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 7221ec4
Remove the need for enironment variable in mock
davidigandan b7e7999
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 2fd00fa
Fix ruff error
davidigandan e4f69ed
removed redundant libs from requirements_dev.txt
davidigandan a6fdbb1
Use compatible release pinning
davidigandan 229214f
Remove the option to manually configure full endpoint
davidigandan 30a67d2
remove debugging
davidigandan 6897e13
remove debugging
davidigandan 5aae712
moved otel_logs closer to relevant code block
davidigandan 0d4d825
simplified logic for log context and corrected self.config.openteleme…
davidigandan 106fcd8
abstracted away span attribute setting logic
davidigandan d437b47
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,227 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import functools | ||
| from collections.abc import Callable | ||
|
|
||
| from opentelemetry import trace | ||
| from opentelemetry.context import Context | ||
| from opentelemetry.propagate import extract, inject | ||
|
|
||
| from workflows.transport.common_transport import MessageCallback, TemporarySubscription | ||
|
|
||
|
|
||
| class OTELTracingMiddleware: | ||
| def __init__(self, tracer: trace.Tracer, service_name: str): | ||
| self.tracer = tracer | ||
| self.service_name = service_name | ||
|
|
||
| def _set_span_attributes(self, span, **attributes): | ||
| """Helper method to set common span attributes""" | ||
| span.set_attribute("service_name", self.service_name) | ||
| for key, value in attributes.items(): | ||
| if value is not None: | ||
| span.set_attribute(key, value) | ||
|
|
||
| def send(self, call_next: Callable, destination: str, message, **kwargs): | ||
| # Get current span context (may be None if this is the root span) | ||
| current_span = trace.get_current_span() | ||
| parent_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else None | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transport.send", | ||
| context=parent_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, destination=destination) | ||
|
|
||
| # Inject the current trace context into the message headers | ||
| headers = kwargs.get("headers", {}) | ||
| if headers is None: | ||
| headers = {} | ||
| inject(headers) # This modifies headers in-place | ||
| kwargs["headers"] = headers | ||
|
|
||
| return call_next(destination, message, **kwargs) | ||
|
|
||
| def subscribe( | ||
| self, call_next: Callable, channel: str, callback: Callable, **kwargs | ||
| ) -> int: | ||
| @functools.wraps(callback) | ||
| def wrapped_callback(header, message): | ||
| # Extract trace context from message headers | ||
| ctx = extract(header) if header else Context() | ||
|
|
||
| # Start a new span with the extracted context | ||
| with self.tracer.start_as_current_span( | ||
| "transport.subscribe", | ||
| context=ctx, | ||
| ) as span: | ||
| self._set_span_attributes(span, channel=channel) | ||
|
|
||
| # Call the original callback - this will process the message | ||
| # and potentially call send() which will pick up this context | ||
| return callback(header, message) | ||
|
|
||
| return call_next(channel, wrapped_callback, **kwargs) | ||
|
|
||
| def subscribe_broadcast( | ||
| self, call_next: Callable, channel: str, callback: Callable, **kwargs | ||
| ) -> int: | ||
| @functools.wraps(callback) | ||
| def wrapped_callback(header, message): | ||
| # Extract trace context from message headers | ||
| ctx = extract(header) if header else Context() | ||
|
|
||
| # Start a new span with the extracted context | ||
| with self.tracer.start_as_current_span( | ||
| "transport.subscribe_broadcast", | ||
| context=ctx, | ||
| ) as span: | ||
| self._set_span_attributes(span, channel=channel) | ||
|
|
||
| return callback(header, message) | ||
|
|
||
| return call_next(channel, wrapped_callback, **kwargs) | ||
|
|
||
| def subscribe_temporary( | ||
| self, | ||
| call_next: Callable, | ||
| channel_hint: str | None, | ||
| callback: MessageCallback, | ||
| **kwargs, | ||
| ) -> TemporarySubscription: | ||
| @functools.wraps(callback) | ||
| def wrapped_callback(header, message): | ||
| # Extract trace context from message headers | ||
| ctx = extract(header) if header else Context() | ||
|
|
||
| # Start a new span with the extracted context | ||
| with self.tracer.start_as_current_span( | ||
| "transport.subscribe_temporary", | ||
| context=ctx, | ||
| ) as span: | ||
| self._set_span_attributes(span, channel_hint=channel_hint) | ||
|
|
||
| return callback(header, message) | ||
|
|
||
| return call_next(channel_hint, wrapped_callback, **kwargs) | ||
|
|
||
| def unsubscribe( | ||
| self, | ||
| call_next: Callable, | ||
| subscription: int, | ||
| drop_callback_reference=False, | ||
| **kwargs, | ||
| ): | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transport.unsubscribe", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, subscription_id=subscription) | ||
|
|
||
| call_next( | ||
| subscription, drop_callback_reference=drop_callback_reference, **kwargs | ||
| ) | ||
|
|
||
| def ack( | ||
| self, | ||
| call_next: Callable, | ||
| message, | ||
| subscription_id: int | None = None, | ||
| **kwargs, | ||
| ): | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transport.ack", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, subscription_id=subscription_id) | ||
|
|
||
| call_next(message, subscription_id=subscription_id, **kwargs) | ||
|
|
||
| def nack( | ||
| self, | ||
| call_next: Callable, | ||
| message, | ||
| subscription_id: int | None = None, | ||
| **kwargs, | ||
| ): | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transport.nack", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, subscription_id=subscription_id) | ||
|
|
||
| call_next(message, subscription_id=subscription_id, **kwargs) | ||
|
|
||
| def transaction_begin( | ||
| self, call_next: Callable, subscription_id: int | None = None, **kwargs | ||
| ) -> int: | ||
| """Start a new transaction span""" | ||
| # Get current span context (may be None if this is the root span) | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transaction.begin", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, subscription_id=subscription_id) | ||
|
|
||
| return call_next(subscription_id=subscription_id, **kwargs) | ||
|
|
||
| def transaction_abort( | ||
| self, call_next: Callable, transaction_id: int | None = None, **kwargs | ||
| ): | ||
| """Abort a transaction span""" | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transaction.abort", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, transaction_id=transaction_id) | ||
|
|
||
| call_next(transaction_id=transaction_id, **kwargs) | ||
|
|
||
| def transaction_commit( | ||
| self, call_next: Callable, transaction_id: int | None = None, **kwargs | ||
| ): | ||
| """Commit a transaction span""" | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transaction.commit", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, transaction_id=transaction_id) | ||
|
|
||
| call_next(transaction_id=transaction_id, **kwargs) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.