В async коде несколько coroutines работают конкурентно в одном потоке. Между ними тоже могут быть race conditions — и asyncio предоставляет примитивы синхронизации.
asyncio.Queue — producer/consumer
import asyncio
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
print(f"Положили: {item}")
await queue.put(None) # sentinel — сигнал завершения
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Обработали: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=3) # буфер на 3 элемента
await asyncio.gather(
producer(queue, range(10)),
consumer(queue),
)
asyncio.Lock — защита от race condition
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
async with lock: # только одна coroutine за раз
value = counter
await asyncio.sleep(0) # точка переключения
counter = value + 1
async def main():
await asyncio.gather(*[increment() for _ in range(100)])
print(counter) # всегда 100, не 87 или 63
asyncio.Semaphore — rate limiting
Самый частый use case: ограничить количество одновременных HTTP запросов.
import httpx
async def fetch(sem: asyncio.Semaphore, client, url):
async with sem: # максимум 5 одновременных запросов
return await client.get(url)
async def main():
sem = asyncio.Semaphore(5)
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
*[fetch(sem, client, url) for url in urls]
)
asyncio.Event — одноразовый сигнал
ready = asyncio.Event()
async def server():
await asyncio.sleep(1) # инициализация
ready.set() # сигнал: готов
async def client():
await ready.wait() # ждём сигнала
print("Подключились!")
async def main():
await asyncio.gather(server(), client())
💬 Комментарии (0)
Комментариев пока нет
Станьте первым, кто поделится мнением об этой статье!