Skip to content

feat(worker): worker.run() is now context managed#77

Open
shijiesheng wants to merge 2 commits intocadence-workflow:mainfrom
shijiesheng:worker-context
Open

feat(worker): worker.run() is now context managed#77
shijiesheng wants to merge 2 commits intocadence-workflow:mainfrom
shijiesheng:worker-context

Conversation

@shijiesheng
Copy link
Member

What changed?

worker.run is now a contextmanager and yields self when it's started and cancels the coroutines on exit

Why?

make it easier to use worker

How did you test it?

Unit Test

Potential risks

Release notes

Documentation Changes

Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
Comment on lines +25 to 27
worker = Worker(self.client(), self.test_name, registry, **kwargs)
async with worker.run():
yield worker
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Client context manager dropped in integration test helper

The old code used async with self.client() as client: which properly called Client.__aenter__() (initializing the gRPC channel via ready()) and Client.__aexit__() (closing the channel via close()). The new code passes self.client() directly to Worker(...) without entering the context manager, so:

  1. ready() is never called — the client may not be properly initialized.
  2. close() is never called — gRPC channel resources will leak after each test.

This will likely cause integration test failures or resource leaks.

Suggested fix:

@asynccontextmanager
async def worker(
    self, registry: Registry, **kwargs: Unpack[WorkerOptions]
) -> AsyncGenerator[Worker, None]:
    async with self.client() as client:
        worker = Worker(client, self.test_name, registry, **kwargs)
        async with worker.run():
            yield worker

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

Comment on lines +43 to +50
tasks.append(asyncio.create_task(self._decision_worker.run()))
if not self._options["disable_activity_worker"]:
tg.create_task(self._activity_worker.run())
tasks.append(asyncio.create_task(self._activity_worker.run()))
yield self
finally:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: Worker tasks may fail silently before yield completes

In Worker.run(), tasks are created via asyncio.create_task() and then yield self is reached. If a task raises an exception before the caller's async with block body executes (e.g., connection failure on first poll), that exception is silently swallowed by return_exceptions=True in the finally block's asyncio.gather(). With the old TaskGroup approach, such exceptions would propagate. Consider checking task results after gather and re-raising if any contain non-CancelledError exceptions.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

@gitar-bot
Copy link

gitar-bot bot commented Mar 24, 2026

Code Review ⚠️ Changes requested 0 resolved / 2 findings

Adds context manager support to worker.run() but the integration test helper drops the client context manager, and worker tasks may fail silently before yield completes.

⚠️ Bug: Client context manager dropped in integration test helper

📄 tests/integration_tests/helper.py:25-27

The old code used async with self.client() as client: which properly called Client.__aenter__() (initializing the gRPC channel via ready()) and Client.__aexit__() (closing the channel via close()). The new code passes self.client() directly to Worker(...) without entering the context manager, so:

  1. ready() is never called — the client may not be properly initialized.
  2. close() is never called — gRPC channel resources will leak after each test.

This will likely cause integration test failures or resource leaks.

Suggested fix
@asynccontextmanager
async def worker(
    self, registry: Registry, **kwargs: Unpack[WorkerOptions]
) -> AsyncGenerator[Worker, None]:
    async with self.client() as client:
        worker = Worker(client, self.test_name, registry, **kwargs)
        async with worker.run():
            yield worker
💡 Edge Case: Worker tasks may fail silently before yield completes

📄 cadence/worker/_worker.py:43-50

In Worker.run(), tasks are created via asyncio.create_task() and then yield self is reached. If a task raises an exception before the caller's async with block body executes (e.g., connection failure on first poll), that exception is silently swallowed by return_exceptions=True in the finally block's asyncio.gather(). With the old TaskGroup approach, such exceptions would propagate. Consider checking task results after gather and re-raising if any contain non-CancelledError exceptions.

🤖 Prompt for agents
Code Review: Adds context manager support to worker.run() but the integration test helper drops the client context manager, and worker tasks may fail silently before yield completes.

1. ⚠️ Bug: Client context manager dropped in integration test helper
   Files: tests/integration_tests/helper.py:25-27

   The old code used `async with self.client() as client:` which properly called `Client.__aenter__()` (initializing the gRPC channel via `ready()`) and `Client.__aexit__()` (closing the channel via `close()`). The new code passes `self.client()` directly to `Worker(...)` without entering the context manager, so:
   1. `ready()` is never called — the client may not be properly initialized.
   2. `close()` is never called — gRPC channel resources will leak after each test.
   
   This will likely cause integration test failures or resource leaks.

   Suggested fix:
   @asynccontextmanager
   async def worker(
       self, registry: Registry, **kwargs: Unpack[WorkerOptions]
   ) -> AsyncGenerator[Worker, None]:
       async with self.client() as client:
           worker = Worker(client, self.test_name, registry, **kwargs)
           async with worker.run():
               yield worker

2. 💡 Edge Case: Worker tasks may fail silently before yield completes
   Files: cadence/worker/_worker.py:43-50

   In `Worker.run()`, tasks are created via `asyncio.create_task()` and then `yield self` is reached. If a task raises an exception before the caller's `async with` block body executes (e.g., connection failure on first poll), that exception is silently swallowed by `return_exceptions=True` in the `finally` block's `asyncio.gather()`. With the old `TaskGroup` approach, such exceptions would propagate. Consider checking task results after gather and re-raising if any contain non-CancelledError exceptions.

Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant