Coming from Software Engineering? Building streaming chat interfaces is like building a real-time log viewer or terminal emulator — data arrives in chunks and you render progressively. The same buffering and display strategies you'd use for a live tail -f view apply here.
Building a Streaming Chat Interface
Here's a complete streaming chat implementation:
# script_id: day_013_streaming_responses_part2/streaming_chat_interface
from openai import OpenAI
client = OpenAI()
class StreamingChat:
"""A streaming chat interface."""
def __init__(self, system_prompt: str = "You are a helpful assistant."):
self.system_prompt = system_prompt
self.messages = []
def chat(self, user_input: str) -> str:
"""Send a message and stream the response."""
# Add user message
self.messages.append({"role": "user", "content": user_input})
# Build full message list
full_messages = [
{"role": "system", "content": self.system_prompt}
] + self.messages
# Stream response
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=full_messages,
stream=True
)
print("Assistant: ", end="", flush=True)
collected = []
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
collected.append(content)
print() # New line
# Add assistant response to history
full_response = "".join(collected)
self.messages.append({"role": "assistant", "content": full_response})
return full_response
def clear_history(self):
"""Clear conversation history."""
self.messages = []
# Interactive usage
def run_chat():
chat = StreamingChat(
system_prompt="You are a friendly coding tutor. Keep responses concise."
)
print("Streaming Chat (type 'quit' to exit, 'clear' to reset)")
print("-" * 50)
while True:
user_input = input("\nYou: ").strip()
if user_input.lower() == 'quit':
break
elif user_input.lower() == 'clear':
chat.clear_history()
print("History cleared!")
continue
elif not user_input:
continue
chat.chat(user_input)
if __name__ == "__main__":
run_chat()
Streaming with Callbacks
Use callbacks when a framework or UI owns the render loop and you only hand it functions to call. This is the same pattern as a Node stream — you hand the streamer three functions (on each text piece, on complete, on error) and it calls them as data arrives.
# script_id: day_013_streaming_responses_part2/stream_with_callbacks
from openai import OpenAI
from typing import Callable
client = OpenAI()
def stream_with_callbacks(
prompt: str,
on_token: Callable[[str], None],
on_complete: Callable[[str], None],
on_error: Callable[[Exception], None] = None
):
"""
Stream with callback functions for integration with UIs.
Args:
prompt: The user's prompt
on_token: Called for each text piece (chunk)
on_complete: Called when streaming finishes
on_error: Called if an error occurs
"""
try:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
collected = []
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
collected.append(content)
on_token(content)
full_response = "".join(collected)
on_complete(full_response)
except Exception as e:
if on_error:
on_error(e)
else:
raise
# Example usage with simple callbacks
def my_token_handler(token: str):
print(token, end="", flush=True)
def my_complete_handler(full_text: str):
print(f"\n\n[Complete! {len(full_text)} chars]")
def my_error_handler(error: Exception):
print(f"\n[Error: {error}]")
stream_with_callbacks(
"Write a 2-line poem",
on_token=my_token_handler,
on_complete=my_complete_handler,
on_error=my_error_handler
)
Server-Sent Events (SSE) for Web Apps
Use SSE when the client is a browser talking to your HTTP endpoint. When building web APIs, use SSE to stream to browsers:
# script_id: day_013_streaming_responses_part2/sse_fastapi_endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI()
async def generate_stream(prompt: str):
"""Generator for SSE streaming."""
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
# SSE format: data: {json}\n\n
data = json.dumps({"content": content})
yield f"data: {data}\n\n"
# Send completion signal
yield f"data: {json.dumps({'done': True})}\n\n"
@app.get("/stream")
async def stream_endpoint(prompt: str):
"""SSE endpoint for streaming responses."""
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream"
)
# Frontend JavaScript to consume this:
"""
const eventSource = new EventSource('/stream?prompt=Hello');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.done) {
eventSource.close();
} else {
document.getElementById('output').textContent += data.content;
}
};
"""
Handling Stream Interruption
Allow users to cancel streams:
# script_id: day_013_streaming_responses_part2/cancellable_stream
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
class CancellableStream:
"""A stream that can be cancelled."""
def __init__(self):
self.cancelled = False
def cancel(self):
"""Cancel the stream."""
self.cancelled = True
async def stream(self, prompt: str) -> str:
"""Stream with cancellation support."""
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
collected = []
async for chunk in stream:
if self.cancelled:
print("\n[Stream cancelled]")
# Breaking out of the loop only stops US from reading more chunks —
# to actually stop the work (and the billing) on the provider side, close the stream.
await stream.close()
break
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
collected.append(content)
return "".join(collected)
async def demo_cancellation():
"""Demo cancelling a stream."""
streamer = CancellableStream()
# Start streaming in background
stream_task = asyncio.create_task(
streamer.stream("Write a very long story about a dragon")
)
# Cancel after 2 seconds
await asyncio.sleep(2)
streamer.cancel()
result = await stream_task
print(f"\n\nGot {len(result)} chars before cancellation")
asyncio.run(demo_cancellation())
Measuring Streaming Performance
# script_id: day_013_streaming_responses_part2/measure_streaming_performance
import time
from openai import OpenAI
client = OpenAI()
def measure_streaming(prompt: str) -> dict:
"""Measure streaming performance metrics."""
start_time = time.time()
first_token_time = None
chunk_count = 0 # chunks, not tokens — a chunk may carry several characters or none (see Day 012)
char_count = 0
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
if first_token_time is None:
first_token_time = time.time()
chunk_count += 1
char_count += len(content)
end_time = time.time()
return {
"total_time": end_time - start_time,
"time_to_first_token": first_token_time - start_time if first_token_time else None,
"streaming_time": end_time - first_token_time if first_token_time else None,
"chunk_count": chunk_count,
"char_count": char_count,
"chars_per_second": char_count / (end_time - first_token_time) if first_token_time else 0
}
# Test it
metrics = measure_streaming("Write a paragraph about streaming APIs")
print("\n--- Streaming Metrics ---")
for key, value in metrics.items():
if isinstance(value, float):
print(f"{key}: {value:.3f}")
else:
print(f"{key}: {value}")
Checkpoint
A model writes its answer one piece at a time, front to back — so the first words are ready almost immediately while the full answer takes much longer. Streaming forwards each piece as it is produced instead of waiting for the last. Time-to-first-token measures that head start.
Run measure_streaming and confirm: it reports a time-to-first-token that's noticeably smaller than the total completion time — that gap is exactly the latency win streaming buys you. If first-token time equals total time, check that you're timing inside the chunk loop (stamping the first chunk's arrival) rather than after the stream has fully drained.
Summary
Quick Reference
# script_id: day_013_streaming_responses_part2/quick_reference
# OpenAI Streaming
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hi"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
# Anthropic Streaming
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Hi"}]
) as stream:
for text in stream.text_stream:
print(text, end="")
Exercises
-
Typing Effect: Implement a "typewriter" effect that adds a small delay between chunks for a more natural feel
-
Progress Estimator: Build a system that estimates completion percentage based on expected output length
-
Stream Merger: Create a function that streams from multiple prompts simultaneously and merges the output
Solutions (approaches)
- Typing Effect: Add
time.sleep(0.02)inside the chunk loop, right before you print each piece — the small pause between chunks gives the natural typewriter feel. - Progress Estimator: Track a running
char_countand divide it by an expected output length you pass in; clamp the ratio to 100% so a longer-than-expected answer doesn't overshoot. - Stream Merger: Run each prompt as its own
asynciotask and interleave their output as it arrives — either withasyncio.as_completedor by having each task push pieces onto a sharedasyncio.Queuethat one consumer drains.
What's Next?
Tomorrow (Day 14) is Structured Output & Data Parsing — using Pydantic to force LLMs into valid, typed JSON instead of free-form text.