Three things you absolutely need in production async code.
asyncio.timeout() — painless timeouts (Python 3.11+)
import asyncio
async def slow_operation():
await asyncio.sleep(10)
async def main():
try:
async with asyncio.timeout(3.0): # 3 seconds max
await slow_operation()
except TimeoutError:
print("Timeout exceeded")
Before Python 3.11 asyncio.wait_for() was used:
try:
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
except asyncio.TimeoutError:
print("Timeout")
CancelledError — task cancellation
async def worker():
try:
while True:
await do_work()
except asyncio.CancelledError:
await cleanup() # release resources
raise # IMPORTANT: always re-raise CancelledError
async def main():
task = asyncio.create_task(worker())
await asyncio.sleep(5)
task.cancel() # send CancelledError into task
try:
await task
except asyncio.CancelledError:
print("Task cancelled cleanly")
Rule: in except CancelledError always re-raise. Otherwise the task won’t finish correctly.
Graceful Shutdown — clean termination
import asyncio, signal
async def main():
loop = asyncio.get_running_loop()
stop = asyncio.Event()
def handle_signal():
print("Got SIGINT, shutting down...")
stop.set()
loop.add_signal_handler(signal.SIGINT, handle_signal)
loop.add_signal_handler(signal.SIGTERM, handle_signal)
# Main work
tasks = [asyncio.create_task(worker(i)) for i in range(5)]
await stop.wait() # wait for signal
# Cancel all tasks
for task in tasks:
task.cancel()
# Wait for completion with cleanup
await asyncio.gather(*tasks, return_exceptions=True)
print("Shut down cleanly")
asyncio.run(main())
return_exceptions=True in gather() matters: without it the first CancelledError will interrupt waiting for the other tasks.
💬 Comments (0)
No comments yet
Be the first to share your opinion about this article!