What's needed?
Sometimes it is useful to ensure any pending futures will be cancelled after some block of code.
Proposed solution
@asynccontextmanager
async def cancelling_pending(*futures: Future[Any]):
try:
yield
finally:
for future in futures:
if not future.done():
future.cancel()
try:
await future
except asyncio.CancelledError:
pass
Example usage:
class Timer:
...
async def ready() -> None:
...
while time_to_next_tick > 0:
tasks = [
asyncio.create_task(asyncio.sleep(time_to_next_tick)),
asyncio.create_task(self._reset_event.wait()),
]
async with cancelling_pending(tasks):
await next(asyncio.as_completed(tasks))
self._reset_event.clear()
now = self._now()
time_to_next_tick = self._next_tick_time - now
Use cases
Alternatives and workarounds
_, pending = await asyncio.wait(
[
asyncio.create_task(asyncio.sleep(time_to_next_tick)),
asyncio.create_task(self._reset_event.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
Additional context
No response
What's needed?
Sometimes it is useful to ensure any pending futures will be cancelled after some block of code.
Proposed solution
Example usage:
Use cases
Alternatives and workarounds
Additional context
No response