feat(worker): worker.run() is now context managed#77
feat(worker): worker.run() is now context managed#77shijiesheng wants to merge 2 commits intocadence-workflow:mainfrom
Conversation
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
| worker = Worker(self.client(), self.test_name, registry, **kwargs) | ||
| async with worker.run(): | ||
| yield worker |
There was a problem hiding this comment.
⚠️ Bug: Client context manager dropped in integration test helper
The old code used async with self.client() as client: which properly called Client.__aenter__() (initializing the gRPC channel via ready()) and Client.__aexit__() (closing the channel via close()). The new code passes self.client() directly to Worker(...) without entering the context manager, so:
ready()is never called — the client may not be properly initialized.close()is never called — gRPC channel resources will leak after each test.
This will likely cause integration test failures or resource leaks.
Suggested fix:
@asynccontextmanager
async def worker(
self, registry: Registry, **kwargs: Unpack[WorkerOptions]
) -> AsyncGenerator[Worker, None]:
async with self.client() as client:
worker = Worker(client, self.test_name, registry, **kwargs)
async with worker.run():
yield worker
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| tasks.append(asyncio.create_task(self._decision_worker.run())) | ||
| if not self._options["disable_activity_worker"]: | ||
| tg.create_task(self._activity_worker.run()) | ||
| tasks.append(asyncio.create_task(self._activity_worker.run())) | ||
| yield self | ||
| finally: | ||
| for task in tasks: | ||
| task.cancel() | ||
| await asyncio.gather(*tasks, return_exceptions=True) |
There was a problem hiding this comment.
💡 Edge Case: Worker tasks may fail silently before yield completes
In Worker.run(), tasks are created via asyncio.create_task() and then yield self is reached. If a task raises an exception before the caller's async with block body executes (e.g., connection failure on first poll), that exception is silently swallowed by return_exceptions=True in the finally block's asyncio.gather(). With the old TaskGroup approach, such exceptions would propagate. Consider checking task results after gather and re-raising if any contain non-CancelledError exceptions.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
What changed?
worker.run is now a contextmanager and yields self when it's started and cancels the coroutines on exit
Why?
make it easier to use worker
How did you test it?
Unit Test
Potential risks
Release notes
Documentation Changes