feat: add Celery integration and improve PostHog client fork safety#464
feat: add Celery integration and improve PostHog client fork safety#464parinporecha wants to merge 4 commits intoPostHog:masterfrom
Conversation
756a6a2 to
e8eff44
Compare
e8eff44 to
896dfa8
Compare
Prompt To Fix All With AIThis is a comment left during a code review.
Path: posthog/integrations/celery.py
Line: 225-248
Comment:
**Context leak if `_on_task_prerun` raises after entering context**
If any code between line 233 (`context_manager.__enter__()`) and the end of the try block throws an exception, the `except` on line 247 swallows it but the context is never exited — it remains pushed onto the `contextvars` stack for the remainder of the thread's life. This corrupts context state for subsequent tasks in the same worker thread.
For example, if `self._apply_propagated_identity(request)` (line 239) or `self._capture_event(...)` (line 246) raises, the context will leak. Similarly, if `request` is `None`, the context manager is entered but never stored on `request._posthog_ctx`, so `_handle_task_end`'s finally block can never exit it.
Consider cleaning up the context in the `except` block:
```python
def _on_task_prerun(self, *args, **kwargs):
context_manager = None
try:
task_id = kwargs.get("task_id")
if not task_id:
return
sender = kwargs.get("sender")
request = getattr(sender, "request", None)
context_tags = self._extract_propagated_tags(request)
task_properties = self._build_task_properties(
sender=sender,
task_id=task_id,
state="started",
)
task_name = task_properties.get("celery_task_name")
context_manager = contexts.new_context(
fresh=True,
capture_exceptions=False,
client=self.client,
)
context_manager.__enter__()
if request is not None:
request._posthog_ctx = context_manager
request._posthog_start = time.monotonic()
self._apply_propagated_identity(request)
merged_tags = {**task_properties, **context_tags}
for key, value in merged_tags.items():
contexts.tag(key, value)
if self.capture_task_lifecycle_events and self._should_track(task_name, task_properties):
self._capture_event("celery task started", properties=task_properties)
except Exception:
logger.exception("Failed to process Celery task_prerun")
if context_manager is not None:
try:
context_manager.__exit__(None, None, None)
except Exception:
pass
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: posthog/client.py
Line: 336-337
Comment:
**`register_at_fork` prevents Client garbage collection**
`os.register_at_fork` callbacks cannot be unregistered. The bound method `self._reinit_after_fork` holds a strong reference to `self`, which means:
1. Every `Client` instance registered here will never be garbage collected for the lifetime of the process.
2. If multiple `Client` instances are created (e.g., in tests, or per-request patterns), each fork will run *all* accumulated callbacks, including for defunct/shutdown clients.
A common mitigation is to use a `weakref` callback so the client can still be collected:
```python
import weakref
# in __init__, replace the current register_at_fork block with:
if hasattr(os, "register_at_fork"):
weak_self = weakref.ref(self)
def _reinit_child():
client = weak_self()
if client is not None:
client._reinit_after_fork()
os.register_at_fork(after_in_child=_reinit_child)
```
This way, if the Client is no longer referenced, the callback becomes a no-op rather than keeping the entire Client alive.
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 84b314c |
dustinbyrne
left a comment
There was a problem hiding this comment.
Hey @parinporecha, love the direction here. I left a few questions/considerations below, let me know what you think!
| def _reinit_after_fork(self): | ||
| """Reinitialize queue and consumer threads in a forked child process. | ||
|
|
||
| Registered via os.register_at_fork(after_in_child=...) so it runs | ||
| exactly once in each child, before any user code, covering all code | ||
| paths (capture, flush, join, etc.). | ||
|
|
||
| Python threads do not survive fork() and queue.Queue internal locks | ||
| may be in an inconsistent state, so both are replaced. | ||
| Inherited queue items are intentionally discarded as they'll be | ||
| handled by the parent process's consumers. | ||
| """ |
There was a problem hiding this comment.
One additional consideration here is that urllib3 sessions contain sockets that will be shared after fork. They're not mutex controlled, so they should should also be recreated.
| # Worker process setup. | ||
| # Celery's default prefork pool runs tasks in child processes, so initialize | ||
| # PostHog per child using worker_process_init. |
There was a problem hiding this comment.
This seems like a non-issue given _reinit_after_fork exists
| @worker_process_init.connect | ||
| def on_worker_process_init(**kwargs) -> None: | ||
| worker_posthog_client = create_client() | ||
| worker_integration = create_integration(worker_posthog_client) | ||
| worker_integration.instrument() | ||
|
|
||
| app._posthog_client = worker_posthog_client | ||
| app._posthog_integration = worker_integration |
There was a problem hiding this comment.
We're also calling instrument() at the module level. Is calling again from the worker necessary?
Regarding the client, consider using the singleton posthog instance for simplicity. Type checking won't be happy about us writing directly to the Celery app.
| @worker_process_shutdown.connect | ||
| def on_worker_process_shutdown(**kwargs) -> None: | ||
| worker_integration = getattr(app, "_posthog_integration", None) | ||
| if worker_integration: | ||
| worker_integration.uninstrument() | ||
|
|
||
| worker_posthog_client = getattr(app, "_posthog_client", None) | ||
| if worker_posthog_client: | ||
| worker_posthog_client.shutdown() |
There was a problem hiding this comment.
Could this logic be part of instrument()? Ensuring we shutdown properly seems like a good idea, otherwise we could be losing event data.
Co-authored-by: Dustin Byrne <dustinsbyrne@gmail.com>
Summary
This PR:
PosthogCeleryIntegrationto automatically capture Celery task lifecycle events and exceptions.distinct_id,session_id, tags) from the task producer to the worker so Celery tasks can be correlated with the originating user/session.Clientsafer across process forks by reinitializing async consumers in child processes.Context
I saw users asking for advice on how to use PostHog with Celery for error tracking in community questions and realized that there's currently no first-class way to instrument Celery workloads with PostHog.
That leaves a few gaps:
This PR addresses those gaps by adding a Celery integration that helps users observe task execution end-to-end out of the box.
The integration takes inspiration from OpenTelemetry's Celery instrumentor and PostHog context propagation is achieved through task headers mirroring Sentry and DataDog's Celery integrations.
While testing this, I found a separate SDK issue: when a
Clientconfigured in async mode is inherited across a process fork, the child process inherits a client whose consumer threads no longer exist. In practice, that means worker-side events don't get delivered reliably.To make it safer, this PR also adds fork handling to
Clientby reinitializing its queue and consumers in the child process viaos.register_at_fork. That said, the recommended setup for Celery remains to initialize a freshClientand instrumentPosthogCeleryIntegrationinside each worker process, as shown in the example.Changes
New: Celery Integration (
posthog/integrations/celery.py)task_prerun,task_success,task_failure, etc.) to capture events likecelery task started,celery task successetc. Check the docstring in the integration module code for complete list of supported events._on_before_task_publish: Injects current PostHog context (distinct_id, session_id, tags) into task headers._on_task_prerun: Extracts headers in the worker and restores the PostHog context for the duration of the task. This context is exited upon task completion.Refactored: Client Fork Safety
posthog/client.py_reinit_after_forkmethod to reset the internal queue and spin up new consumers in a child process.os.register_at_fork(on supported platforms) to automatically call this method, so that the SDK does not drop captured events when used in child processes.Examples
examples/celery_integration.pyTests
posthog/test/integrations/test_celery_integration.pycovering:posthog/test/test_client.pyfor_reinit_after_forkto verify consumers are restarted correctly.Screenshots (created through example script)
Celery task lifecycle events and captured Exception -

Celery task success event emitted from worker carrying correct distinct ID, session ID set in parent and context tags -

Captured exception -
