The most common multi-agent pattern: one agent delegates tasks to specialized workers. This guide shows you how to build hierarchical agent systems.
Coming from Software Engineering? This is the task queue pattern you've used a hundred times. The supervisor is a job dispatcher (think Celery beat or a Kubernetes Job controller), and workers are specialized consumers. You already understand fan-out, result aggregation, worker health checks, and retry on failure. The only twist: the "dispatcher logic" is an LLM deciding which worker to route to, rather than hard-coded routing rules.
The Supervisor Pattern
The supervisor:
- Receives the user's request
- Decides which worker(s) to use
- Delegates tasks to workers
- Combines results
- Returns final answer
Basic Supervisor Implementation
# script_id: day_051_supervisor_worker/supervisor_system
from openai import OpenAI
import json
from typing import Literal
client = OpenAI()
# Define workers
def research_worker(query: str) -> str:
"""Worker that does research."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a research specialist. Provide factual, well-researched information."},
{"role": "user", "content": query}
]
)
return response.choices[0].message.content
def writing_worker(task: str) -> str:
"""Worker that writes content."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a professional writer. Create engaging, well-structured content."},
{"role": "user", "content": task}
]
)
return response.choices[0].message.content
def code_worker(task: str) -> str:
"""Worker that writes code."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an expert programmer. Write clean, efficient code."},
{"role": "user", "content": task}
]
)
return response.choices[0].message.content
# Supervisor
def supervisor(user_request: str) -> str:
"""Supervisor that routes to appropriate workers."""
# Decide which worker to use
routing_response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """You are a task router. Analyze the user's request and decide which worker should handle it.
Workers available:
- research: For factual questions, information gathering
- writing: For content creation, essays, articles
- code: For programming tasks, code writing
Respond with JSON: {"worker": "research|writing|code", "task": "specific task for worker"}"""},
{"role": "user", "content": user_request}
],
response_format={"type": "json_object"}
)
routing = json.loads(routing_response.choices[0].message.content)
worker_name = routing["worker"]
task = routing["task"]
print(f"Supervisor delegating to: {worker_name}")
print(f"Task: {task}")
# Execute worker
workers = {
"research": research_worker,
"writing": writing_worker,
"code": code_worker
}
result = workers[worker_name](task)
return result
# Usage
result = supervisor("Write a Python function to calculate prime numbers")
print(result)
Multi-Worker Delegation
Sometimes a task needs multiple workers:
# script_id: day_051_supervisor_worker/supervisor_system
def supervisor_multi(user_request: str) -> str:
"""Supervisor that can delegate to multiple workers."""
# Analyze and create execution plan
plan_response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """Analyze this request and create an execution plan.
Workers available:
- research: Information gathering
- writing: Content creation
- code: Programming
Return JSON:
{
"steps": [
{"worker": "...", "task": "...", "depends_on": null},
{"worker": "...", "task": "...", "depends_on": 0}
]
}
Steps can depend on previous steps (by index)."""},
{"role": "user", "content": user_request}
],
response_format={"type": "json_object"}
)
plan = json.loads(plan_response.choices[0].message.content)
results = []
# Execute plan
for i, step in enumerate(plan["steps"]):
print(f"\nStep {i + 1}: {step['worker']} - {step['task'][:50]}...")
# Include previous results if this step depends on them
task = step["task"]
if step.get("depends_on") is not None:
prev_result = results[step["depends_on"]]
task = f"{task}\n\nContext from previous step:\n{prev_result}"
# Execute worker
workers = {"research": research_worker, "writing": writing_worker, "code": code_worker}
result = workers[step["worker"]](task)
results.append(result)
# Combine results
final_response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Combine these worker results into a coherent final response."},
{"role": "user", "content": f"Original request: {user_request}\n\nResults:\n" + "\n---\n".join(results)}
]
)
return final_response.choices[0].message.content
# Usage
result = supervisor_multi("Research the latest AI trends and write a blog post with code examples")
print(result)
LangGraph Supervisor Implementation
# script_id: day_051_supervisor_worker/supervisor_system
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Literal, List
from operator import add
class SupervisorState(TypedDict):
messages: Annotated[List, add]
task: str
next_worker: str
worker_results: Annotated[List, add]
final_result: str
def supervisor_node(state: SupervisorState) -> dict:
"""Supervisor decides next action."""
task = state["task"]
results = state.get("worker_results", [])
# Check if we have enough results
if len(results) >= 2: # Example: after 2 workers
return {"next_worker": "synthesize"}
# Decide next worker
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Decide the next worker: research, writing, code, or synthesize if done."},
{"role": "user", "content": f"Task: {task}\nCompleted: {len(results)} steps"}
]
)
next_worker = response.choices[0].message.content.strip().lower()
return {"next_worker": next_worker, "messages": [f"Routing to: {next_worker}"]}
def research_node(state: SupervisorState) -> dict:
result = research_worker(state["task"])
return {"worker_results": [f"Research: {result}"], "messages": ["Research complete"]}
def writing_node(state: SupervisorState) -> dict:
result = writing_worker(state["task"])
return {"worker_results": [f"Writing: {result}"], "messages": ["Writing complete"]}
def code_node(state: SupervisorState) -> dict:
result = code_worker(state["task"])
return {"worker_results": [f"Code: {result}"], "messages": ["Code complete"]}
def synthesize_node(state: SupervisorState) -> dict:
"""Combine all results."""
results = state["worker_results"]
combined = "\n\n".join(results)
return {"final_result": combined, "messages": ["Synthesis complete"]}
def route_to_worker(state: SupervisorState) -> Literal["research", "writing", "code", "synthesize"]:
return state["next_worker"]
# Build graph
workflow = StateGraph(SupervisorState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("research", research_node)
workflow.add_node("writing", writing_node)
workflow.add_node("code", code_node)
workflow.add_node("synthesize", synthesize_node)
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges(
"supervisor",
route_to_worker,
{
"research": "research",
"writing": "writing",
"code": "code",
"synthesize": "synthesize"
}
)
# Workers return to supervisor
workflow.add_edge("research", "supervisor")
workflow.add_edge("writing", "supervisor")
workflow.add_edge("code", "supervisor")
workflow.add_edge("synthesize", END)
app = workflow.compile()
Parallel Worker Execution
Run workers in parallel for speed:
# script_id: day_051_supervisor_worker/parallel_workers
import asyncio
from typing import List, Dict
async def async_research_worker(query: str) -> str:
"""Async research worker."""
# In real code, use async OpenAI client
return f"Research result for: {query}"
async def async_writing_worker(task: str) -> str:
"""Async writing worker."""
return f"Writing result for: {task}"
async def parallel_supervisor(request: str) -> Dict:
"""Execute multiple workers in parallel."""
# Determine which workers to use
plan = [
{"worker": "research", "task": f"Research: {request}"},
{"worker": "writing", "task": f"Write about: {request}"}
]
# Create tasks
tasks = []
for step in plan:
if step["worker"] == "research":
tasks.append(async_research_worker(step["task"]))
elif step["worker"] == "writing":
tasks.append(async_writing_worker(step["task"]))
# Execute in parallel
results = await asyncio.gather(*tasks)
return {
"workers_used": [p["worker"] for p in plan],
"results": list(results)
}
# Usage
result = asyncio.run(parallel_supervisor("AI trends"))
print(result)
Specialized Supervisor Patterns
Expert Router
Route to domain-specific experts:
# script_id: day_051_supervisor_worker/supervisor_system
EXPERTS = {
"legal": "You are a legal expert. Provide legally-sound advice.",
"medical": "You are a medical professional. Provide health information.",
"technical": "You are a tech expert. Explain technical concepts.",
"financial": "You are a financial advisor. Provide financial guidance."
}
def expert_router(query: str) -> str:
"""Route to appropriate expert."""
# Classify query
classification = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"Classify this query into one of: {list(EXPERTS.keys())}. Respond with just the category."},
{"role": "user", "content": query}
]
)
expert_type = classification.choices[0].message.content.strip().lower()
expert_prompt = EXPERTS.get(expert_type, EXPERTS["technical"])
# Get expert response
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": expert_prompt},
{"role": "user", "content": query}
]
)
return f"[{expert_type.upper()} EXPERT]\n{response.choices[0].message.content}"
Recursive Supervisor
Handle complex tasks by breaking them down:
# script_id: day_051_supervisor_worker/supervisor_system
def recursive_supervisor(task: str, depth: int = 0, max_depth: int = 3) -> str:
"""Break down complex tasks recursively."""
if depth >= max_depth:
return f"[Leaf task] {task}"
# Check if task needs breakdown
analysis = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """Analyze this task.
If it's simple, respond: {"simple": true, "response": "direct answer"}
If complex, respond: {"simple": false, "subtasks": ["subtask1", "subtask2"]}"""},
{"role": "user", "content": task}
],
response_format={"type": "json_object"}
)
result = json.loads(analysis.choices[0].message.content)
if result.get("simple"):
return result["response"]
# Recursively handle subtasks
subtask_results = []
for subtask in result.get("subtasks", []):
sub_result = recursive_supervisor(subtask, depth + 1, max_depth)
subtask_results.append(sub_result)
# Combine results
return f"Task: {task}\nSubtasks completed:\n" + "\n".join(subtask_results)
Best Practices
1. Clear Worker Responsibilities
# script_id: day_051_supervisor_worker/clear_responsibilities
# Good: Specific, focused workers
research_worker = "Find factual information from reliable sources"
writing_worker = "Create engaging written content"
code_worker = "Write clean, tested code"
# Bad: Overlapping responsibilities
helper1 = "Help with stuff"
helper2 = "Also help with things"
2. Supervisor Context
# script_id: day_051_supervisor_worker/supervisor_context
# Include context in supervisor decisions
supervisor_prompt = f"""
Current task: {task}
Workers used so far: {workers_used}
Results so far: {summary_of_results}
Remaining workers: {available_workers}
Decide next action...
"""
3. Error Handling
# script_id: day_051_supervisor_worker/error_handling
def safe_worker_call(worker_fn, task: str, max_retries: int = 2) -> str:
"""Call worker with retry logic."""
for attempt in range(max_retries + 1):
try:
return worker_fn(task)
except Exception as e:
if attempt == max_retries:
return f"Worker failed after {max_retries} retries: {e}"
print(f"Retry {attempt + 1}...")
Summary
Quick Reference
# script_id: day_051_supervisor_worker/quick_reference
# Basic supervisor
def supervisor(task):
worker = decide_worker(task)
return workers[worker](task)
# Multi-worker
results = []
for step in plan:
results.append(workers[step.worker](step.task))
return combine(results)
# Parallel
results = await asyncio.gather(*[worker(task) for worker, task in assignments])
What's Next?
Now let's explore Adversarial Debate - where agents critique each other's work to find better solutions!