Learn/API & Integration/Streaming Responses
API & Integration

Streaming Responses

How streaming works, why it matters for UX, and how to implement it in your app.

Why Streaming Changes the User Experience

Without streaming, your user submits a prompt and stares at a spinner for 5-15 seconds while the model generates the full response. With streaming, text starts appearing within half a second and keeps coming. The total time is the same, but the perceived experience is dramatically better.

This isn't just aesthetics. Research on user interfaces consistently shows that progressive feedback — even if slower overall — is preferred to long waits followed by instant output. For AI applications, streaming is essentially non-negotiable for production quality.

How SSE Works

Streaming in AI APIs uses Server-Sent Events (SSE) — a simple protocol where the server sends a series of newline-separated data: lines over a kept-open HTTP connection, and the client reads them as they arrive.

A streaming response looks like this at the wire level:

``` data: {"type": "content_block_delta", "delta": {"type": "text_delta", "text": "Hello"}}

data: {"type": "content_block_delta", "delta": {"type": "text_delta", "text": ", how"}}

data: {"type": "content_block_delta", "delta": {"type": "text_delta", "text": " can I"}}

data: [DONE] ```

Each data: line is a JSON object containing a chunk of the response. Your code reads these events in a loop and processes each chunk.

Streaming with the Anthropic SDK

```python import anthropic

client = anthropic.Anthropic()

# Simple text streaming with client.messages.stream( model="claude-opus-4-5", max_tokens=1024, messages=[{"role": "user", "content": "Explain how HTTP keep-alive works."}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

print() # newline after stream ends

# Access the final message after streaming with client.messages.stream(...) as stream: for text in stream.text_stream: print(text, end="", flush=True) final_message = stream.get_final_message() print(f"\nTotal tokens: {final_message.usage.input_tokens + final_message.usage.output_tokens}") ```

For full event handling (useful for tool use in streaming):

python with client.messages.stream( model="claude-opus-4-5", max_tokens=1024, messages=[{"role": "user", "content": "..."}] ) as stream: for event in stream: match event.type: case "content_block_delta": if event.delta.type == "text_delta": print(event.delta.text, end="", flush=True) case "message_stop": print("\nStream complete")

Streaming with the OpenAI SDK

```python from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Write a short poem about APIs."}], stream=True )

for chunk in stream: delta = chunk.choices[0].delta if delta.content: print(delta.content, end="", flush=True) if chunk.choices[0].finish_reason: print(f"\nFinish reason: {chunk.choices[0].finish_reason}") ```

For tool calls in streaming with OpenAI, you receive delta chunks that build up the tool call arguments:

python tool_call_chunks = {} for chunk in stream: delta = chunk.choices[0].delta if delta.tool_calls: for tc in delta.tool_calls: if tc.index not in tool_call_chunks: tool_call_chunks[tc.index] = {"id": "", "name": "", "arguments": ""} if tc.id: tool_call_chunks[tc.index]["id"] += tc.id if tc.function.name: tool_call_chunks[tc.index]["name"] += tc.function.name if tc.function.arguments: tool_call_chunks[tc.index]["arguments"] += tc.function.arguments

Streaming in a Web App

In a browser, use the fetch API with a ReadableStream:

```javascript async function streamCompletion(prompt) { const response = await fetch('/api/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message: prompt }) });

const reader = response.body.getReader(); const decoder = new TextDecoder(); const output = document.getElementById('output');

while (true) { const { done, value } = await reader.read(); if (done) break;

const chunk = decoder.decode(value, { stream: true }); // Parse SSE lines const lines = chunk.split('\n').filter(line => line.startsWith('data: ')); for (const line of lines) { const data = line.slice(6); // Remove "data: " if (data === '[DONE]') continue; try { const parsed = JSON.parse(data); const text = parsed.choices?.[0]?.delta?.content || parsed.delta?.text || ''; output.textContent += text; } catch (e) { / incomplete JSON chunk / } } } } ```

For your backend to forward streaming to the client, use your framework's streaming response support:

```python # FastAPI example — stream Anthropic response to browser from fastapi import FastAPI from fastapi.responses import StreamingResponse import anthropic

app = FastAPI() client = anthropic.Anthropic()

@app.post("/api/chat") async def chat(request: dict): async def generate(): with client.messages.stream( model="claude-opus-4-5", max_tokens=1024, messages=[{"role": "user", "content": request["message"]}] ) as stream: for text in stream.text_stream: yield f"data: {json.dumps({'delta': {'text': text}})}\n\n" yield "data: [DONE]\n\n"

return StreamingResponse(generate(), media_type="text/event-stream") ```

Error Handling During Streams

Errors can occur partway through a stream. The connection might drop, the API might return an error mid-stream, or the model might hit an internal error after beginning generation.

python try: with client.messages.stream( model="claude-opus-4-5", max_tokens=1024, messages=[{"role": "user", "content": "..."}] ) as stream: collected_text = "" for text in stream.text_stream: collected_text += text print(text, end="", flush=True) except anthropic.APIConnectionError: print("\nConnection lost during stream") # Retry logic or fallback except anthropic.RateLimitError: print("\nRate limited — implement backoff") except anthropic.APIStatusError as e: print(f"\nAPI error {e.status_code}: {e.message}")

For production streaming, always handle partial completions. If the stream terminates early, you might have a useful partial response you can display or log rather than discarding entirely.

What to Do With Partial Responses in Your UI

Two common patterns:

  1. 1.Append to a text buffer: The simplest approach. Each chunk appends to a string that's rendered in real time. Works for plain text responses.
  1. 1.Parse when complete: For structured output (JSON, markdown), collect the full stream, then parse. Don't try to parse JSON as it streams — you'll get invalid JSON until the last chunk.

For markdown rendering during streaming, some libraries (like marked) can incrementally render markdown as it arrives, though this requires care around partially-rendered elements.

Have a follow-up question about this topic?

Ask AI