By default, messages.create() waits until the model has fully generated its response before returning anything. For long responses this means 5–15 seconds of silence. Streaming solves this: the response arrives token by token as the model generates it.
How Streaming Works
Technically it is HTTP keep-alive: the server holds the connection open and sends data chunks as they are generated. The SDK wraps this in a convenient context manager, messages.stream().
Basic Example
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain what an API is"}]
) as stream:
for text_chunk in stream.text_stream:
print(text_chunk, end="", flush=True)
print() # newline after completion
Key details:
- with ... as stream: — context manager, closes the connection automatically
- stream.text_stream — iterator over text chunks (strings)
- end="" — do not add a newline after each chunk
- flush=True — flush the buffer immediately, don’t wait for it to fill up
Getting Usage After Streaming
with client.messages.stream(...) as stream:
for chunk in stream.text_stream:
print(chunk, end="", flush=True)
# After completion — the final message with usage stats:
final = stream.get_final_message()
print()
print(f"Tokens: {final.usage.input_tokens} in / {final.usage.output_tokens} out")
Streaming with rich
from rich.console import Console
console = Console()
with client.messages.stream(...) as stream:
for chunk in stream.text_stream:
# highlight=False is required when streaming:
console.print(chunk, end="", highlight=False)
highlight=False matters: rich tries to apply syntax highlighting to each chunk individually — this breaks the output. Disable it when streaming token by token.
Handling Interruption
try:
with client.messages.stream(...) as stream:
for chunk in stream.text_stream:
print(chunk, end="", flush=True)
except KeyboardInterrupt:
print("\n[interrupted]")
Event Streaming (Low Level)
stream.text_stream is a high-level iterator over text only. To access all events (start, delta, stop), iterate over stream directly:
with client.messages.stream(...) as stream:
for event in stream:
if hasattr(event, 'delta') and hasattr(event.delta, 'text'):
print(event.delta.text, end="", flush=True)
When to Use Streaming
Use streaming when:
- The response is long (>200 tokens)
- A user is watching the screen
- UX matters — the feeling of a “live” response
Don’t use streaming when:
- You are processing the response programmatically (parsing JSON)
- You are making batch requests in the background
- You only need the final result
Collecting the Full Text While Streaming
chunks: list[str] = []
with client.messages.stream(...) as stream:
for chunk in stream.text_stream:
print(chunk, end="", flush=True)
chunks.append(chunk)
final = stream.get_final_message()
full_text = "".join(chunks) # complete response to save to history
💬 Comments (0)
No comments yet
Be the first to share your opinion about this article!