Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
722bf3e
Add basic tracing middleware and global control
davidigandan Jan 13, 2026
52cb04d
Instrument on subscribe and add dcid to span attributes
davidigandan Jan 26, 2026
cc9ee12
Add spanid and traceid metadata to greylog
davidigandan Jan 26, 2026
f7cc658
Add recipe_id to spans
davidigandan Jan 26, 2026
8b2a2f1
Add dev and prod dependencies
davidigandan Jan 26, 2026
0686e28
Remove dcid extract from message and inject to span logic. Will be ad…
davidigandan Jan 26, 2026
2d9e21c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
3a5283a
Use plugin configurations to configure connection to OTELCollector
davidigandan Jan 26, 2026
4b999f1
Remove vestigial dcid handling and unnecessary debug statements
davidigandan Jan 26, 2026
4b86715
remove unhelpful docstring
davidigandan Jan 26, 2026
9c13d07
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan Jan 26, 2026
3e0b902
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
d446e80
imported OTEL config class to common_service
davidigandan Jan 26, 2026
16b0e10
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
7ad857f
add marshmallow dependency
davidigandan Jan 27, 2026
468f940
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan Jan 27, 2026
7aae664
add zocalo dependency
davidigandan Jan 27, 2026
902a7df
Fix possibly unbound error
davidigandan Jan 27, 2026
9e5adb7
Moved plugin functionality to python-workflows
davidigandan Feb 2, 2026
1d7457e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 2, 2026
ff3679c
Fixed typos, vestigial code and improper use of log_extender
davidigandan Feb 6, 2026
e6be925
Remove vestigial try block and fix runtime issue where None[] or None…
davidigandan Feb 6, 2026
aebe1ca
Remove vestigial try block and fix runtime issue where None[] or None…
davidigandan Feb 6, 2026
77eb9e9
Implement ExitStack() to manage multiple context managers and clean t…
davidigandan Feb 6, 2026
5f94077
Fix broken tracing functionality
davidigandan Feb 17, 2026
786bd00
Fix
davidigandan Feb 17, 2026
33fbce2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
ed6bc93
Fix rw.environment.get('ID') bug
davidigandan Feb 17, 2026
2369ba1
Ensure environment and environment.id exists
davidigandan Feb 17, 2026
554baa2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
7221ec4
Remove the need for enironment variable in mock
davidigandan Feb 17, 2026
b7e7999
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
2fd00fa
Fix ruff error
davidigandan Feb 17, 2026
e4f69ed
removed redundant libs from requirements_dev.txt
davidigandan Feb 24, 2026
a6fdbb1
Use compatible release pinning
davidigandan Feb 24, 2026
229214f
Remove the option to manually configure full endpoint
davidigandan Feb 24, 2026
30a67d2
remove debugging
davidigandan Feb 24, 2026
6897e13
remove debugging
davidigandan Feb 24, 2026
5aae712
moved otel_logs closer to relevant code block
davidigandan Feb 24, 2026
0d4d825
simplified logic for log context and corrected self.config.openteleme…
davidigandan Feb 24, 2026
106fcd8
abstracted away span attribute setting logic
davidigandan Feb 24, 2026
d437b47
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
]
license = { text = "BSD-3-Clause" }
requires-python = ">=3.10"
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"]
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api~=1.20.0", "opentelemetry-sdk~=1.20.0", "opentelemetry-exporter-otlp-proto-http~=1.20.0" ]

[project.urls]
Download = "https://github.com/DiamondLightSource/python-workflows/releases"
Expand Down Expand Up @@ -53,6 +53,7 @@ OfflineTransport = "workflows.transport.offline_transport:OfflineTransport"
pika = "workflows.util.zocalo.configuration:Pika"
stomp = "workflows.util.zocalo.configuration:Stomp"
transport = "workflows.util.zocalo.configuration:DefaultTransport"
opentelemetry = "workflows.util.zocalo.configuration:OTEL"

[project.scripts]
"workflows.validate_recipe" = "workflows.recipe.validate:main"
Expand Down
2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-timeout==2.3.1
stomp-py==8.1.2
websocket-client==1.8.0
websocket-client==1.8.0
32 changes: 30 additions & 2 deletions src/workflows/recipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import functools
import logging
from collections.abc import Callable
from contextlib import ExitStack
from typing import Any

from opentelemetry import trace

from workflows.recipe.recipe import Recipe
from workflows.recipe.validate import validate_recipe
from workflows.recipe.wrapper import RecipeWrapper
Expand Down Expand Up @@ -69,10 +72,35 @@ def unwrap_recipe(header, message):
message = mangle_for_receiving(message)
if header.get("workflows-recipe") in {True, "True", "true", 1}:
rw = RecipeWrapper(message=message, transport=transport_layer)
if log_extender and rw.environment and rw.environment.get("ID"):
with log_extender("recipe_ID", rw.environment["ID"]):

if log_extender and rw.environment["ID"]:
# Extract recipe ID from environment and add to current span
span = trace.get_current_span()
recipe_id = rw.environment["ID"]
span.set_attribute("recipe_id", recipe_id)

# Extract span_id and trace_id for logging
span_context = span.get_span_context()
otel_logs = None
if span_context.is_valid:
span_id = span_context.span_id
trace_id = span_context.trace_id

otel_logs = {
"span_id": span_id,
"trace_id": trace_id,
"recipe_id": recipe_id,
}

with ExitStack() as stack:
# Configure the context depending on if service is emitting spans
stack.enter_context(log_extender("recipe_ID", recipe_id))
if otel_logs:
stack.enter_context(log_extender("otel_logs", otel_logs))
return callback(rw, header, message.get("payload"))

return callback(rw, header, message.get("payload"))

if allow_non_recipe_messages:
return callback(None, header, message)
# self.log.warning('Discarding non-recipe message:\n' + \
Expand Down
41 changes: 41 additions & 0 deletions src/workflows/services/common_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
import time
from typing import Any

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

import workflows
import workflows.logging
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware


class Status(enum.Enum):
Expand Down Expand Up @@ -185,6 +192,40 @@ def start_transport(self):
self.transport.subscription_callback_set_intercept(
self._transport_interceptor
)

# Configure OTELTracing if configuration is available
otel_config = (
self.config._opentelemetry
if self.config and hasattr(self.config, "_opentelemetry")
else None
)
if otel_config:
# Configure OTELTracing
resource = Resource.create(
{
SERVICE_NAME: self._service_name,
}
)

self.log.debug("Configuring OTELTracing")
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

# Configure BatchProcessor and OTLPSpanExporter using config values
otlp_exporter = OTLPSpanExporter(
endpoint=otel_config["endpoint"],
timeout=otel_config.get("timeout", 10),
)
span_processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(span_processor)

# Add OTELTracingMiddleware to the transport layer
tracer = trace.get_tracer(__name__)
otel_middleware = OTELTracingMiddleware(
tracer, service_name=self._service_name
)
self._transport.add_middleware(otel_middleware)

metrics = self._environment.get("metrics")
if metrics:
import prometheus_client
Expand Down
227 changes: 227 additions & 0 deletions src/workflows/transport/middleware/otel_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
from __future__ import annotations

import functools
from collections.abc import Callable

from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.propagate import extract, inject

from workflows.transport.common_transport import MessageCallback, TemporarySubscription


class OTELTracingMiddleware:
def __init__(self, tracer: trace.Tracer, service_name: str):
self.tracer = tracer
self.service_name = service_name

def _set_span_attributes(self, span, **attributes):
"""Helper method to set common span attributes"""
span.set_attribute("service_name", self.service_name)
for key, value in attributes.items():
if value is not None:
span.set_attribute(key, value)

def send(self, call_next: Callable, destination: str, message, **kwargs):
# Get current span context (may be None if this is the root span)
current_span = trace.get_current_span()
parent_context = (
trace.set_span_in_context(current_span) if current_span else None
)

with self.tracer.start_as_current_span(
"transport.send",
context=parent_context,
) as span:
self._set_span_attributes(span, destination=destination)

# Inject the current trace context into the message headers
headers = kwargs.get("headers", {})
if headers is None:
headers = {}
inject(headers) # This modifies headers in-place
kwargs["headers"] = headers

return call_next(destination, message, **kwargs)

def subscribe(
self, call_next: Callable, channel: str, callback: Callable, **kwargs
) -> int:
@functools.wraps(callback)
def wrapped_callback(header, message):
# Extract trace context from message headers
ctx = extract(header) if header else Context()

# Start a new span with the extracted context
with self.tracer.start_as_current_span(
"transport.subscribe",
context=ctx,
) as span:
self._set_span_attributes(span, channel=channel)

# Call the original callback - this will process the message
# and potentially call send() which will pick up this context
return callback(header, message)

return call_next(channel, wrapped_callback, **kwargs)

def subscribe_broadcast(
self, call_next: Callable, channel: str, callback: Callable, **kwargs
) -> int:
@functools.wraps(callback)
def wrapped_callback(header, message):
# Extract trace context from message headers
ctx = extract(header) if header else Context()

# Start a new span with the extracted context
with self.tracer.start_as_current_span(
"transport.subscribe_broadcast",
context=ctx,
) as span:
self._set_span_attributes(span, channel=channel)

return callback(header, message)

return call_next(channel, wrapped_callback, **kwargs)

def subscribe_temporary(
self,
call_next: Callable,
channel_hint: str | None,
callback: MessageCallback,
**kwargs,
) -> TemporarySubscription:
@functools.wraps(callback)
def wrapped_callback(header, message):
# Extract trace context from message headers
ctx = extract(header) if header else Context()

# Start a new span with the extracted context
with self.tracer.start_as_current_span(
"transport.subscribe_temporary",
context=ctx,
) as span:
self._set_span_attributes(span, channel_hint=channel_hint)

return callback(header, message)

return call_next(channel_hint, wrapped_callback, **kwargs)

def unsubscribe(
self,
call_next: Callable,
subscription: int,
drop_callback_reference=False,
**kwargs,
):
# Get current span context
current_span = trace.get_current_span()
current_context = (
trace.set_span_in_context(current_span) if current_span else Context()
)

with self.tracer.start_as_current_span(
"transport.unsubscribe",
context=current_context,
) as span:
self._set_span_attributes(span, subscription_id=subscription)

call_next(
subscription, drop_callback_reference=drop_callback_reference, **kwargs
)

def ack(
self,
call_next: Callable,
message,
subscription_id: int | None = None,
**kwargs,
):
# Get current span context
current_span = trace.get_current_span()
current_context = (
trace.set_span_in_context(current_span) if current_span else Context()
)

with self.tracer.start_as_current_span(
"transport.ack",
context=current_context,
) as span:
self._set_span_attributes(span, subscription_id=subscription_id)

call_next(message, subscription_id=subscription_id, **kwargs)

def nack(
self,
call_next: Callable,
message,
subscription_id: int | None = None,
**kwargs,
):
# Get current span context
current_span = trace.get_current_span()
current_context = (
trace.set_span_in_context(current_span) if current_span else Context()
)

with self.tracer.start_as_current_span(
"transport.nack",
context=current_context,
) as span:
self._set_span_attributes(span, subscription_id=subscription_id)

call_next(message, subscription_id=subscription_id, **kwargs)

def transaction_begin(
self, call_next: Callable, subscription_id: int | None = None, **kwargs
) -> int:
"""Start a new transaction span"""
# Get current span context (may be None if this is the root span)
current_span = trace.get_current_span()
current_context = (
trace.set_span_in_context(current_span) if current_span else Context()
)

with self.tracer.start_as_current_span(
"transaction.begin",
context=current_context,
) as span:
self._set_span_attributes(span, subscription_id=subscription_id)

return call_next(subscription_id=subscription_id, **kwargs)

def transaction_abort(
self, call_next: Callable, transaction_id: int | None = None, **kwargs
):
"""Abort a transaction span"""
# Get current span context
current_span = trace.get_current_span()
current_context = (
trace.set_span_in_context(current_span) if current_span else Context()
)

with self.tracer.start_as_current_span(
"transaction.abort",
context=current_context,
) as span:
self._set_span_attributes(span, transaction_id=transaction_id)

call_next(transaction_id=transaction_id, **kwargs)

def transaction_commit(
self, call_next: Callable, transaction_id: int | None = None, **kwargs
):
"""Commit a transaction span"""
# Get current span context
current_span = trace.get_current_span()
current_context = (
trace.set_span_in_context(current_span) if current_span else Context()
)

with self.tracer.start_as_current_span(
"transaction.commit",
context=current_context,
) as span:
self._set_span_attributes(span, transaction_id=transaction_id)

call_next(transaction_id=transaction_id, **kwargs)
20 changes: 20 additions & 0 deletions src/workflows/util/zocalo/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@
from workflows.transport.stomp_transport import StompTransport


class OTEL:
"""A Zocalo configuration plugin to pre-populate OTELTracing config defaults"""

class Schema(PluginSchema):
host = fields.Str(required=True)
port = fields.Int(required=True)
timeout = fields.Int(required=False, load_default=10)

# Store configuration for access by services
config = {}

@staticmethod
def activate(configuration):
# Build the full endpoint URL
endpoint = f"https://{configuration['host']}:{configuration['port']}/v1/traces"
OTEL.config["endpoint"] = endpoint
OTEL.config["timeout"] = configuration.get("timeout", 10)
return OTEL.config


class Stomp:
"""A Zocalo configuration plugin to pre-populate StompTransport config defaults"""

Expand Down
Loading