По умолчанию messages.create() ждёт пока модель полностью сформирует ответ и только потом возвращает результат. При длинных ответах это 5–15 секунд молчания. Streaming решает это: ответ приходит токен за токеном пока модель генерирует.
Как работает стриминг
Технически это HTTP keep-alive: сервер держит соединение открытым и отправляет чанки данных по мере генерации. SDK оборачивает это в удобный контекстный менеджер messages.stream().
Базовый пример
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Объясни что такое API"}]
) as stream:
for text_chunk in stream.text_stream:
print(text_chunk, end="", flush=True)
print() # перенос строки после завершения
Ключевые детали:
- with ... as stream: — контекстный менеджер, закрывает соединение автоматически
- stream.text_stream — итератор по текстовым чанкам (строки)
- end="" — не добавлять перенос строки после каждого чанка
- flush=True — сразу выводить буфер, не ждать накопления
Получение usage после стриминга
with client.messages.stream(...) as stream:
for chunk in stream.text_stream:
print(chunk, end="", flush=True)
# После завершения — финальное сообщение с usage:
final = stream.get_final_message()
print()
print(f"Токены: {final.usage.input_tokens} in / {final.usage.output_tokens} out")
Стриминг с rich
from rich.console import Console
console = Console()
with client.messages.stream(...) as stream:
for chunk in stream.text_stream:
# highlight=False обязателен при стриминге:
console.print(chunk, end="", highlight=False)
highlight=False важен: rich пытается подсвечивать синтаксис в каждом чанке отдельно — это ломает вывод. Отключи при построчном стриминге.
Обработка прерывания
try:
with client.messages.stream(...) as stream:
for chunk in stream.text_stream:
print(chunk, end="", flush=True)
except KeyboardInterrupt:
print("\n[прервано]")
Стриминг событий (низкий уровень)
stream.text_stream — высокоуровневый итератор только по тексту. Для доступа ко всем событиям (start, delta, stop) используй stream:
with client.messages.stream(...) as stream:
for event in stream:
if hasattr(event, 'delta') and hasattr(event.delta, 'text'):
print(event.delta.text, end="", flush=True)
Когда использовать стриминг
Используй стриминг когда:
- Ответ длинный (>200 токенов)
- Пользователь ждёт у экрана
- Важен UX — ощущение «живого» ответа
Не используй стриминг когда:
- Обрабатываешь ответ программно (парсишь JSON)
- Делаешь батч-запросы в фоне
- Нужен только итоговый результат
Собираем полный текст при стриминге
chunks: list[str] = []
with client.messages.stream(...) as stream:
for chunk in stream.text_stream:
print(chunk, end="", flush=True)
chunks.append(chunk)
final = stream.get_final_message()
full_text = "".join(chunks) # весь ответ для сохранения в историю
💬 Комментарии (0)
Комментариев пока нет
Станьте первым, кто поделится мнением об этой статье!