Phase 7Production Deployment·6 min read

WebSockets for Real-Time Agent Streaming

Phase 7 of 8

Watch your agent think in real-time! WebSockets enable bidirectional communication for streaming agent responses and thoughts.

Coming from Software Engineering? If you've built real-time features — chat apps, live dashboards, collaborative editing, or notification systems — you already know WebSockets. The pattern here is identical: open a persistent connection, stream data as it becomes available, handle disconnections gracefully. Server-Sent Events (SSE) also works well for one-way streaming. The AI-specific part is streaming token-by-token output, which is just pushing small chunks of text over the socket as the LLM generates them.


Why WebSockets?

Benefits:

  • Real-time: Instant updates
  • Bidirectional: Client and server can send
  • Efficient: Single connection
  • Better UX: See agent thinking

Basic WebSocket Server

# script_id: day_085_websockets_streaming/basic_websocket_server
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio

app = FastAPI()

@app.websocket("/ws/agent")
async def agent_websocket(websocket: WebSocket):
    """WebSocket endpoint for agent interaction."""

    await websocket.accept()

    try:
        while True:
            # Receive message from client
            data = await websocket.receive_text()

            # Stream response back
            async for chunk in stream_agent_response(data):
                await websocket.send_text(chunk)

            # Send completion marker
            await websocket.send_text("[DONE]")

    except WebSocketDisconnect:
        print("Client disconnected")

async def stream_agent_response(prompt: str):
    """Generate streaming response."""

    # Simulate streaming agent response
    response = f"Processing your request: {prompt}. "
    response += "Here's my analysis... "
    response += "And my conclusion."

    for word in response.split():
        yield word + " "
        await asyncio.sleep(0.1)  # Simulate thinking

Streaming LLM Responses

Stream actual LLM output:

# script_id: day_085_websockets_streaming/streaming_llm_responses
from fastapi import FastAPI, WebSocket
from openai import OpenAI

app = FastAPI()
client = OpenAI()

@app.websocket("/ws/chat")
async def chat_websocket(websocket: WebSocket):
    """Stream LLM responses via WebSocket."""

    await websocket.accept()

    try:
        while True:
            # Get user message
            user_message = await websocket.receive_text()

            # Stream response
            stream = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": user_message}],
                stream=True
            )

            for chunk in stream:
                if chunk.choices[0].delta.content:
                    await websocket.send_text(chunk.choices[0].delta.content)

            await websocket.send_text("\n[DONE]")

    except Exception as e:
        await websocket.send_text(f"[ERROR] {str(e)}")

Streaming Agent Thoughts

Stream the agent's reasoning process:

# script_id: day_085_websockets_streaming/streaming_agent_thoughts
from fastapi import FastAPI, WebSocket
from dataclasses import dataclass
from typing import AsyncGenerator
import json

app = FastAPI()

@dataclass
class AgentThought:
    type: str  # "thinking", "action", "observation", "answer"
    content: str

async def run_agent_with_thoughts(prompt: str) -> AsyncGenerator[AgentThought, None]:
    """Run agent and yield thoughts."""

    # Thinking
    yield AgentThought("thinking", "Analyzing the question...")
    await asyncio.sleep(0.5)

    yield AgentThought("thinking", "I need to search for information")
    await asyncio.sleep(0.5)

    # Action
    yield AgentThought("action", "Searching: relevant information")
    await asyncio.sleep(1)

    # Observation
    yield AgentThought("observation", "Found: relevant results about the topic")
    await asyncio.sleep(0.5)

    # More thinking
    yield AgentThought("thinking", "Processing the results...")
    await asyncio.sleep(0.5)

    # Final answer
    yield AgentThought("answer", "Based on my research, here's the answer...")

@app.websocket("/ws/agent-thoughts")
async def agent_thoughts_websocket(websocket: WebSocket):
    """Stream agent thoughts via WebSocket."""

    await websocket.accept()

    try:
        while True:
            prompt = await websocket.receive_text()

            async for thought in run_agent_with_thoughts(prompt):
                message = json.dumps({
                    "type": thought.type,
                    "content": thought.content
                })
                await websocket.send_text(message)

            await websocket.send_text(json.dumps({"type": "done"}))

    except WebSocketDisconnect:
        pass

Connection Manager

Handle multiple clients:

# script_id: day_085_websockets_streaming/connection_manager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict
import json

class ConnectionManager:
    """Manage WebSocket connections."""

    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, websocket: WebSocket, client_id: str):
        await websocket.accept()
        self.active_connections[client_id] = websocket

    def disconnect(self, client_id: str):
        if client_id in self.active_connections:
            del self.active_connections[client_id]

    async def send_personal(self, message: str, client_id: str):
        if client_id in self.active_connections:
            await self.active_connections[client_id].send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections.values():
            await connection.send_text(message)

manager = ConnectionManager()
app = FastAPI()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket, client_id)

    try:
        while True:
            data = await websocket.receive_text()

            # Process and stream back
            async for chunk in process_message(data):
                await manager.send_personal(chunk, client_id)

    except WebSocketDisconnect:
        manager.disconnect(client_id)

Client-Side JavaScript

<!DOCTYPE html>
<html>
<head>
    <title>Agent Chat</title>
</head>
<body>
    <div id="chat"></div>
    <input type="text" id="input" placeholder="Type a message...">
    <button onclick="send()">Send</button>

    <script>
        const ws = new WebSocket("ws://localhost:8000/ws/agent");
        const chat = document.getElementById("chat");
        let currentMessage = "";

        ws.onmessage = function(event) {
            const data = JSON.parse(event.data);

            if (data.type === "done") {
                // Add final message
                chat.innerHTML += `<div class="message">${currentMessage}</div>`;
                currentMessage = "";
            } else if (data.type === "thinking") {
                // Show thinking indicator
                chat.innerHTML += `<div class="thinking">💭 ${data.content}</div>`;
            } else if (data.type === "answer") {
                currentMessage += data.content;
                // Update live
                document.getElementById("current").innerHTML = currentMessage;
            }
        };

        ws.onopen = function() {
            console.log("Connected!");
        };

        ws.onclose = function() {
            console.log("Disconnected");
        };

        function send() {
            const input = document.getElementById("input");
            ws.send(input.value);
            chat.innerHTML += `<div class="user">You: ${input.value}</div>`;
            chat.innerHTML += `<div id="current"></div>`;
            input.value = "";
        }
    </script>
</body>
</html>

Python WebSocket Client

# script_id: day_085_websockets_streaming/python_websocket_client
import asyncio
import websockets
import json

async def chat_with_agent():
    """Connect to agent WebSocket and chat."""

    uri = "ws://localhost:8000/ws/agent-thoughts"

    async with websockets.connect(uri) as websocket:
        # Send message
        await websocket.send("What is machine learning?")

        # Receive streaming response
        while True:
            message = await websocket.recv()
            data = json.loads(message)

            if data["type"] == "done":
                break
            elif data["type"] == "thinking":
                print(f"💭 {data['content']}")
            elif data["type"] == "action":
                print(f"🔧 {data['content']}")
            elif data["type"] == "observation":
                print(f"👁️ {data['content']}")
            elif data["type"] == "answer":
                print(f"✅ {data['content']}")

# Run client
asyncio.run(chat_with_agent())

Error Handling

# script_id: day_085_websockets_streaming/error_handling
from fastapi import WebSocket, WebSocketDisconnect
import traceback

@app.websocket("/ws/robust")
async def robust_websocket(websocket: WebSocket):
    """WebSocket with robust error handling."""

    await websocket.accept()

    try:
        while True:
            try:
                data = await asyncio.wait_for(
                    websocket.receive_text(),
                    timeout=60.0  # 60 second timeout
                )

                async for chunk in process(data):
                    await websocket.send_text(chunk)

            except asyncio.TimeoutError:
                # Send ping to keep connection alive
                await websocket.send_text(json.dumps({"type": "ping"}))

            except Exception as e:
                # Send error to client
                await websocket.send_text(json.dumps({
                    "type": "error",
                    "message": str(e)
                }))

    except WebSocketDisconnect:
        print("Client disconnected normally")
    except Exception as e:
        print(f"WebSocket error: {traceback.format_exc()}")

Authentication

Secure your WebSocket:

# script_id: day_085_websockets_streaming/websocket_auth
from fastapi import WebSocket, Query, HTTPException
import jwt

async def get_current_user(token: str):
    """Validate JWT token."""
    try:
        payload = jwt.decode(token, "secret", algorithms=["HS256"])
        return payload["user_id"]
    except:
        return None

@app.websocket("/ws/secure")
async def secure_websocket(
    websocket: WebSocket,
    token: str = Query(...)
):
    """Authenticated WebSocket endpoint."""

    user_id = await get_current_user(token)
    if not user_id:
        await websocket.close(code=4001, reason="Invalid token")
        return

    await websocket.accept()
    # ... rest of handler

Summary


Quick Reference

# script_id: day_085_websockets_streaming/quick_reference
# Server
@app.websocket("/ws")
async def ws(websocket: WebSocket):
    await websocket.accept()
    data = await websocket.receive_text()
    await websocket.send_text("response")

# Stream LLM
for chunk in stream:
    await websocket.send_text(chunk.choices[0].delta.content)

# Client (JavaScript)
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (e) => console.log(e.data);
ws.send("Hello");

# Client (Python)
async with websockets.connect(uri) as ws:
    await ws.send("Hello")
    response = await ws.recv()

What's Next?

Now let's build beautiful agent UIs with Streamlit and Gradio!