How streaming works, why it matters for UX, and how to implement it in your app.
Without streaming, your user submits a prompt and stares at a spinner for 5-15 seconds while the model generates the full response. With streaming, text starts appearing within half a second and keeps coming. The total time is the same, but the perceived experience is dramatically better.
This isn't just aesthetics. Research on user interfaces consistently shows that progressive feedback — even if slower overall — is preferred to long waits followed by instant output. For AI applications, streaming is essentially non-negotiable for production quality.
Streaming in AI APIs uses Server-Sent Events (SSE) — a simple protocol where the server sends a series of newline-separated data: lines over a kept-open HTTP connection, and the client reads them as they arrive.
A streaming response looks like this at the wire level:
``` data: {"type": "content_block_delta", "delta": {"type": "text_delta", "text": "Hello"}}
data: {"type": "content_block_delta", "delta": {"type": "text_delta", "text": ", how"}}
data: {"type": "content_block_delta", "delta": {"type": "text_delta", "text": " can I"}}
data: [DONE] ```
Each data: line is a JSON object containing a chunk of the response. Your code reads these events in a loop and processes each chunk.
```python import anthropic
client = anthropic.Anthropic()
# Simple text streaming with client.messages.stream( model="claude-opus-4-5", max_tokens=1024, messages=[{"role": "user", "content": "Explain how HTTP keep-alive works."}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)
print() # newline after stream ends
# Access the final message after streaming with client.messages.stream(...) as stream: for text in stream.text_stream: print(text, end="", flush=True) final_message = stream.get_final_message() print(f"\nTotal tokens: {final_message.usage.input_tokens + final_message.usage.output_tokens}") ```
For full event handling (useful for tool use in streaming):
python
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "..."}]
) as stream:
for event in stream:
match event.type:
case "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
case "message_stop":
print("\nStream complete")
```python from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Write a short poem about APIs."}], stream=True )
for chunk in stream: delta = chunk.choices[0].delta if delta.content: print(delta.content, end="", flush=True) if chunk.choices[0].finish_reason: print(f"\nFinish reason: {chunk.choices[0].finish_reason}") ```
For tool calls in streaming with OpenAI, you receive delta chunks that build up the tool call arguments:
python
tool_call_chunks = {}
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
for tc in delta.tool_calls:
if tc.index not in tool_call_chunks:
tool_call_chunks[tc.index] = {"id": "", "name": "", "arguments": ""}
if tc.id:
tool_call_chunks[tc.index]["id"] += tc.id
if tc.function.name:
tool_call_chunks[tc.index]["name"] += tc.function.name
if tc.function.arguments:
tool_call_chunks[tc.index]["arguments"] += tc.function.arguments
In a browser, use the fetch API with a ReadableStream:
```javascript async function streamCompletion(prompt) { const response = await fetch('/api/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message: prompt }) });
const reader = response.body.getReader(); const decoder = new TextDecoder(); const output = document.getElementById('output');
while (true) { const { done, value } = await reader.read(); if (done) break;
const chunk = decoder.decode(value, { stream: true }); // Parse SSE lines const lines = chunk.split('\n').filter(line => line.startsWith('data: ')); for (const line of lines) { const data = line.slice(6); // Remove "data: " if (data === '[DONE]') continue; try { const parsed = JSON.parse(data); const text = parsed.choices?.[0]?.delta?.content || parsed.delta?.text || ''; output.textContent += text; } catch (e) { / incomplete JSON chunk / } } } } ```
For your backend to forward streaming to the client, use your framework's streaming response support:
```python # FastAPI example — stream Anthropic response to browser from fastapi import FastAPI from fastapi.responses import StreamingResponse import anthropic
app = FastAPI() client = anthropic.Anthropic()
@app.post("/api/chat") async def chat(request: dict): async def generate(): with client.messages.stream( model="claude-opus-4-5", max_tokens=1024, messages=[{"role": "user", "content": request["message"]}] ) as stream: for text in stream.text_stream: yield f"data: {json.dumps({'delta': {'text': text}})}\n\n" yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream") ```
Errors can occur partway through a stream. The connection might drop, the API might return an error mid-stream, or the model might hit an internal error after beginning generation.
python
try:
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "..."}]
) as stream:
collected_text = ""
for text in stream.text_stream:
collected_text += text
print(text, end="", flush=True)
except anthropic.APIConnectionError:
print("\nConnection lost during stream")
# Retry logic or fallback
except anthropic.RateLimitError:
print("\nRate limited — implement backoff")
except anthropic.APIStatusError as e:
print(f"\nAPI error {e.status_code}: {e.message}")
For production streaming, always handle partial completions. If the stream terminates early, you might have a useful partial response you can display or log rather than discarding entirely.
Two common patterns:
For markdown rendering during streaming, some libraries (like marked) can incrementally render markdown as it arrives, though this requires care around partially-rendered elements.
Have a follow-up question about this topic?
Ask AI