In async code multiple coroutines run concurrently in a single thread. Race conditions can still happen — and asyncio provides synchronization primitives.
asyncio.Queue — producer/consumer
import asyncio
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # sentinel — completion signal
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=3) # buffer of 3 items
await asyncio.gather(
producer(queue, range(10)),
consumer(queue),
)
asyncio.Lock — race condition protection
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
async with lock: # only one coroutine at a time
value = counter
await asyncio.sleep(0) # switching point
counter = value + 1
async def main():
await asyncio.gather(*[increment() for _ in range(100)])
print(counter) # always 100, not 87 or 63
asyncio.Semaphore — rate limiting
Most common use case: limit the number of concurrent HTTP requests.
import httpx
async def fetch(sem: asyncio.Semaphore, client, url):
async with sem: # maximum 5 concurrent requests
return await client.get(url)
async def main():
sem = asyncio.Semaphore(5)
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
*[fetch(sem, client, url) for url in urls]
)
asyncio.Event — one-shot signal
ready = asyncio.Event()
async def server():
await asyncio.sleep(1) # initialization
ready.set() # signal: ready
async def client():
await ready.wait() # wait for signal
print("Connected!")
async def main():
await asyncio.gather(server(), client())
💬 Comments (0)
No comments yet
Be the first to share your opinion about this article!