AsyncAnthropic is the asynchronous version of the Anthropic client. It uses async/await and integrates with the asyncio event loop.
Differences from the Synchronous Client
# Synchronous (blocks the thread)
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(...)
# Asynchronous (non-blocking)
import anthropic
client = anthropic.AsyncAnthropic()
response = await client.messages.create(...)
The API is identical — you just add await.
Basic Usage
import asyncio
import anthropic
from environs import Env
env = Env()
env.read_env()
client = anthropic.AsyncAnthropic(api_key=env.str("ANTHROPIC_API_KEY"))
async def chat(message: str) -> str:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": message}]
)
return response.content[0].text
async def main():
answer = await chat("Объясни async/await за 3 предложения")
print(answer)
asyncio.run(main())
Async Streaming
async def stream_chat(message: str) -> str:
chunks = []
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": message}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
chunks.append(text)
final = await stream.get_final_message()
print()
return "".join(chunks)
asyncio.gather() — Parallel Requests
async def analyze_batch(texts: list[str]) -> list[str]:
"""Sends all requests in parallel."""
async def analyze_one(text: str) -> str:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=256,
messages=[{"role": "user", "content": f"Тональность: {text}. Одним словом."}]
)
return response.content[0].text.strip()
return await asyncio.gather(*[analyze_one(t) for t in texts])
async def main():
texts = ["Отличный продукт!", "Ужасный сервис", "Нормально, ничего особенного"]
results = await analyze_batch(texts)
for text, sentiment in zip(texts, results):
print(f"{sentiment}: {text}")
asyncio.run(main())
Error Handling
async def safe_chat(message: str) -> str | None:
try:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": message}]
)
return response.content[0].text
except anthropic.RateLimitError:
await asyncio.sleep(5)
return await safe_chat(message) # retry
except anthropic.APIError as e:
print(f"Ошибка API: {e}")
return None
💬 Comments (0)
No comments yet
Be the first to share your opinion about this article!