feat: Implement activity heartbeat#73
Conversation
Signed-off-by: Tim Li <ltim@uber.com>
🔍 CI failure analysis for f9e8396: Code formatting check failed: the new test file tests/integration_tests/workflow/test_heartbeat.py violates Black code style rules and needs to be reformatted before merge.OverviewCI analysis found 1 code formatting violation in the activity heartbeat feature PR. The build completed successfully through dependency installation and protobuf generation, but failed at the final Black code style validation step. FailuresBlack Code Formatter Violation (confidence: high)
Summary
Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
| return self._info | ||
|
|
||
| def heartbeat(self, *details: Any) -> None: | ||
| asyncio.ensure_future(self._heartbeat_sender.send_heartbeat(*details)) |
There was a problem hiding this comment.
⚠️ Bug: Fire-and-forget heartbeat silently swallows RPC errors
_Context.heartbeat() uses asyncio.ensure_future() which creates a fire-and-forget task. If the heartbeat RPC fails (e.g., activity cancelled by the server, network error, or timeout exceeded), the exception is silently discarded — the user gets no feedback that their heartbeat failed.
In Cadence, a heartbeat failure (specifically CancellationAlreadyRequested) is the primary mechanism to notify a long-running activity that it has been cancelled. Silently dropping these errors means activities cannot respond to cancellation requests, which defeats a key purpose of heartbeating.
Additionally, Python will log Task exception was never retrieved warnings for unhandled exceptions in fire-and-forget tasks, polluting logs.
Suggested fix:
Consider awaiting the heartbeat or adding an exception handler callback:
def heartbeat(self, *details: Any) -> None:
task = asyncio.ensure_future(
self._heartbeat_sender.send_heartbeat(*details)
)
task.add_done_callback(self._on_heartbeat_done)
@staticmethod
def _on_heartbeat_done(task: asyncio.Task) -> None:
if task.exception() is not None:
logger.warning("Heartbeat failed: %s", task.exception())
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| return _Context(self._client, info, activity_def, heartbeat_sender) | ||
| else: | ||
| return _SyncContext(self._client, info, activity_def, self._thread_pool) | ||
| loop = asyncio.get_running_loop() |
There was a problem hiding this comment.
no need to pass it in. You can get loop anywhere inside the loop
| class _HeartbeatSender: | ||
| def __init__( | ||
| self, | ||
| worker_stub: Any, |
There was a problem hiding this comment.
Nit: Use the Client type rather than Any
| from cadence.data_converter import DataConverter | ||
|
|
||
|
|
||
| class _HeartbeatSender: |
There was a problem hiding this comment.
I like that this is a separate class, we'll probably want two main things in the future:
- Automatic heartbeating
- Handling the heartbeat response to cancel the activity execution.
So the scope of this will definitely grow.
| return self._info | ||
|
|
||
| def heartbeat(self, *details: Any) -> None: | ||
| asyncio.ensure_future(self._heartbeat_sender.send_heartbeat(*details)) |
There was a problem hiding this comment.
Nit: We should log errors. Long term we need to handle errors to terminate the activity execution.
What changed?
Implement the activity heartbeat feature
Why?
Heartbeat is a crucial component that will inform the server about client status, especially useful in long-running activities
How did you test it?
Unit test
Potential risks
Release notes
Documentation Changes