Even with the best prompts and JSON modes, LLMs sometimes produce invalid output. A production-ready system needs intelligent retry logic that can recover from errors, provide feedback to the LLM, and eventually succeed.
Coming from Software Engineering? Retry loops for LLMs are identical to retry patterns for any flaky external service — the same exponential backoff, jitter, and max-retries patterns from your HTTP client libraries (requests, axios) apply. The twist: you can send the error message back to the LLM so it learns from its mistake. Production retry systems follow the exact same patterns as circuit breakers in microservices (Hystrix, resilience4j, Polly). If you've implemented retry policies for database connections or API calls, this code will feel like home — just with token costs as an additional concern.
Why Retry Loops Matter
Common Failure Scenarios
| Scenario | Example | Recovery Strategy |
|---|---|---|
| Invalid JSON | {name: "John"} |
Clean and repair |
| Missing fields | {"name": "John"} (missing age) |
Ask to complete |
| Wrong types | {"age": "thirty"} |
Ask for correction |
| Schema mismatch | Extra/wrong fields | Provide specific feedback |
| Rate limiting | API 429 error | Exponential backoff |
Basic Retry Pattern
# script_id: day_016_retry_loops/basic_retry_pattern
from openai import OpenAI
from pydantic import BaseModel, ValidationError
import json
import time
client = OpenAI()
class UserInfo(BaseModel):
name: str
age: int
email: str
def extract_with_retry(
text: str,
max_retries: int = 3
) -> UserInfo | None:
"""Extract user info with basic retry logic."""
schema = UserInfo.model_json_schema()
last_error = None
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": f"""Extract user info as JSON:
Schema: {json.dumps(schema)}
Text: {text}
Return only JSON."""
}
],
temperature=0,
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
return UserInfo(**data)
except json.JSONDecodeError as e:
last_error = f"JSON parse error: {e}"
except ValidationError as e:
last_error = f"Validation error: {e}"
except Exception as e:
last_error = f"Unexpected error: {e}"
print(f"Attempt {attempt + 1} failed: {last_error}")
time.sleep(1) # Brief pause between retries
print(f"All {max_retries} attempts failed")
return None
# Usage
result = extract_with_retry("Contact John Doe, age 30, at john@email.com")
if result:
print(f"Success: {result}")
Intelligent Retry with Feedback
The real power comes from telling the LLM what went wrong:
# script_id: day_016_retry_loops/feedback_retry
from openai import OpenAI
from pydantic import BaseModel, ValidationError
import json
client = OpenAI()
class ProductReview(BaseModel):
product_name: str
rating: float # 1-5
sentiment: str # positive, negative, neutral
summary: str
def extract_with_feedback(
text: str,
max_retries: int = 3
) -> ProductReview | None:
"""Extract with error feedback to LLM."""
schema = ProductReview.model_json_schema()
messages = [
{
"role": "system",
"content": """You are a JSON extraction assistant.
Extract information and return valid JSON matching the schema.
If you receive error feedback, correct your response accordingly."""
},
{
"role": "user",
"content": f"""Extract product review info from this text.
Schema: {json.dumps(schema, indent=2)}
Text: {text}
Return only valid JSON."""
}
]
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0,
response_format={"type": "json_object"}
)
content = response.choices[0].message.content
data = json.loads(content)
result = ProductReview(**data)
print(f"Success on attempt {attempt + 1}")
return result
except json.JSONDecodeError as e:
error_feedback = f"JSON parsing failed: {e}. Please return valid JSON."
except ValidationError as e:
# Create detailed error feedback
errors = e.errors()
error_details = []
for err in errors:
field = ".".join(str(x) for x in err["loc"])
error_details.append(f"- {field}: {err['msg']}")
error_feedback = f"""Validation failed with these errors:
{chr(10).join(error_details)}
Please fix these issues and return corrected JSON."""
# Add the failed response and error to conversation
messages.append({
"role": "assistant",
"content": content if 'content' in dir() else "Invalid response"
})
messages.append({
"role": "user",
"content": error_feedback
})
print(f"Attempt {attempt + 1} failed, providing feedback...")
return None
# Test with challenging input
review_text = """
OMG this phone is AMAZING!!! Battery lasts forever, camera is insane.
Definitely 5 stars, would buy again!!! Best purchase of 2024!
"""
result = extract_with_feedback(review_text)
if result:
print(f"\nExtracted: {result.model_dump_json(indent=2)}")
Exponential Backoff for Rate Limits
# script_id: day_016_retry_loops/exponential_backoff
from openai import OpenAI, RateLimitError, APIError
import time
import random
client = OpenAI()
def call_with_backoff(
messages: list,
max_retries: int = 5,
base_delay: float = 1.0
) -> str:
"""Make API call with exponential backoff for rate limits."""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response.choices[0].message.content
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {delay:.2f}s... (Attempt {attempt + 1})")
time.sleep(delay)
except APIError as e:
if e.status_code >= 500: # Server error, might be temporary
delay = base_delay * (2 ** attempt)
print(f"Server error. Waiting {delay:.2f}s...")
time.sleep(delay)
else:
raise # Client error, don't retry
raise Exception("Max retries exceeded")
# Usage
result = call_with_backoff([{"role": "user", "content": "Hello"}])
Complete Production Retry System
Here's a fully-featured retry system:
# script_id: day_016_retry_loops/production_retry_system
from openai import OpenAI, RateLimitError, APIError, APIConnectionError
from pydantic import BaseModel, ValidationError
from typing import TypeVar, Type, Callable, Any
from dataclasses import dataclass
import json
import time
import random
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = OpenAI()
T = TypeVar('T', bound=BaseModel)
@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retry_on_validation_error: bool = True
@dataclass
class RetryResult:
"""Result of a retry operation."""
success: bool
data: Any = None
attempts: int = 0
errors: list = None
class SmartRetry:
"""Smart retry system for LLM calls with Pydantic validation."""
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
def extract(
self,
text: str,
schema: Type[T],
system_prompt: str = None
) -> RetryResult:
"""Extract structured data with smart retries."""
json_schema = schema.model_json_schema()
errors = []
messages = self._build_initial_messages(text, json_schema, system_prompt)
for attempt in range(self.config.max_retries):
logger.info(f"Attempt {attempt + 1}/{self.config.max_retries}")
try:
# Make API call with rate limit handling
response = self._call_api(messages)
content = response.choices[0].message.content
# Parse JSON
data = json.loads(content)
# Validate with Pydantic
result = schema(**data)
return RetryResult(
success=True,
data=result,
attempts=attempt + 1,
errors=errors
)
except RateLimitError as e:
error_info = self._handle_rate_limit(attempt, e)
errors.append(error_info)
except APIConnectionError as e:
error_info = self._handle_connection_error(attempt, e)
errors.append(error_info)
except json.JSONDecodeError as e:
error_info = {"type": "json_error", "message": str(e)}
errors.append(error_info)
# Add feedback for JSON errors
messages = self._add_json_error_feedback(messages, content, e)
except ValidationError as e:
error_info = {"type": "validation_error", "details": e.errors()}
errors.append(error_info)
if not self.config.retry_on_validation_error:
break
# Add feedback for validation errors
messages = self._add_validation_feedback(messages, content, e)
return RetryResult(
success=False,
attempts=self.config.max_retries,
errors=errors
)
def _build_initial_messages(
self,
text: str,
schema: dict,
system_prompt: str
) -> list:
"""Build initial message list."""
system = system_prompt or """You are a precise JSON extraction assistant.
Extract information and return valid JSON matching the provided schema.
If you receive error feedback, carefully correct your response."""
return [
{"role": "system", "content": system},
{
"role": "user",
"content": f"""Extract information from the following text.
Schema:
{json.dumps(schema, indent=2)}
Text:
{text}
Return only valid JSON matching the schema."""
}
]
def _call_api(self, messages: list):
"""Make API call with rate limit retry."""
for rate_attempt in range(3): # Inner retry for rate limits
try:
return client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0,
response_format={"type": "json_object"}
)
except RateLimitError:
delay = self._calculate_delay(rate_attempt)
logger.warning(f"Rate limited, waiting {delay:.2f}s")
time.sleep(delay)
raise RateLimitError("Rate limit retry exhausted")
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter."""
delay = self.config.base_delay * (self.config.exponential_base ** attempt)
delay = min(delay, self.config.max_delay)
if self.config.jitter:
delay += random.uniform(0, delay * 0.1)
return delay
def _handle_rate_limit(self, attempt: int, error) -> dict:
"""Handle rate limit error."""
delay = self._calculate_delay(attempt)
logger.warning(f"Rate limited. Waiting {delay:.2f}s")
time.sleep(delay)
return {"type": "rate_limit", "delay": delay}
def _handle_connection_error(self, attempt: int, error) -> dict:
"""Handle connection error."""
delay = min(5, self.config.base_delay * (attempt + 1))
logger.warning(f"Connection error. Waiting {delay:.2f}s")
time.sleep(delay)
return {"type": "connection_error", "message": str(error)}
def _add_json_error_feedback(
self,
messages: list,
content: str,
error: json.JSONDecodeError
) -> list:
"""Add feedback for JSON parsing errors."""
messages.append({"role": "assistant", "content": content})
messages.append({
"role": "user",
"content": f"""Your response was not valid JSON.
Error: {error}
Please return ONLY valid JSON with no additional text or formatting."""
})
return messages
def _add_validation_feedback(
self,
messages: list,
content: str,
error: ValidationError
) -> list:
"""Add feedback for validation errors."""
error_details = []
for err in error.errors():
loc = ".".join(str(x) for x in err["loc"])
error_details.append(f"- Field '{loc}': {err['msg']} (got: {err.get('input', 'N/A')})")
messages.append({"role": "assistant", "content": content})
messages.append({
"role": "user",
"content": f"""The JSON has validation errors:
{chr(10).join(error_details)}
Please fix these specific issues and return corrected JSON."""
})
return messages
# Usage Example
class OrderInfo(BaseModel):
customer_name: str
items: list[str]
total: float
shipping_address: str
retry_system = SmartRetry(RetryConfig(
max_retries=3,
retry_on_validation_error=True
))
order_text = """
Order from John Smith
Items: 2x Widget ($10 each), 1x Gadget ($25)
Ship to: 123 Main St, Boston MA
Total: $45.00
"""
result = retry_system.extract(order_text, OrderInfo)
if result.success:
print(f"Extracted in {result.attempts} attempt(s):")
print(result.data.model_dump_json(indent=2))
else:
print(f"Failed after {result.attempts} attempts")
print(f"Errors: {result.errors}")
Async Retry System
For high-throughput applications:
# script_id: day_016_retry_loops/async_retry_system
import asyncio
from openai import AsyncOpenAI
from pydantic import BaseModel, ValidationError
import json
async_client = AsyncOpenAI()
async def extract_with_async_retry(
text: str,
schema_class: type[BaseModel],
max_retries: int = 3
) -> BaseModel | None:
"""Async extraction with retry."""
schema = schema_class.model_json_schema()
messages = [
{"role": "user", "content": f"Extract as JSON matching: {json.dumps(schema)}\n\nText: {text}"}
]
for attempt in range(max_retries):
try:
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0,
response_format={"type": "json_object"}
)
data = json.loads(response.choices[0].message.content)
return schema_class(**data)
except (json.JSONDecodeError, ValidationError) as e:
if attempt < max_retries - 1:
messages.append({
"role": "assistant",
"content": response.choices[0].message.content
})
messages.append({
"role": "user",
"content": f"Error: {e}. Please fix and return valid JSON."
})
await asyncio.sleep(1)
return None
# Batch processing with retries
async def batch_extract(
texts: list[str],
schema_class: type[BaseModel],
max_concurrent: int = 5
) -> list:
"""Process multiple texts concurrently with retry."""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one(text: str):
async with semaphore:
return await extract_with_async_retry(text, schema_class)
tasks = [process_one(text) for text in texts]
return await asyncio.gather(*tasks)
# Usage
class Person(BaseModel):
name: str
age: int
async def main():
texts = [
"John is 30 years old",
"Sarah, age 25",
"Mike (45)"
]
results = await batch_extract(texts, Person)
for text, result in zip(texts, results):
print(f"{text} -> {result}")
asyncio.run(main())
Summary
Quick Reference
# script_id: day_016_retry_loops/quick_reference
# Basic retry pattern
for attempt in range(max_retries):
try:
result = make_call()
return result
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
# With feedback
messages.append({"role": "assistant", "content": bad_response})
messages.append({"role": "user", "content": f"Error: {e}. Please fix."})
Exercises
-
Retry Dashboard: Build a system that tracks retry statistics (success rate, average attempts, common errors)
-
Adaptive Retry: Create a retry system that adjusts its strategy based on error patterns
-
Circuit Breaker: Implement a circuit breaker that stops retrying after too many failures
Congratulations!
You've completed Month 1! You now understand:
- How LLMs work (Transformers, tokenization, sampling)
- Advanced prompting (few-shot, CoT, system prompts)
- API mastery (SDKs, async, streaming)
- Structured output (Pydantic, JSON modes, retries)
But first, one more foundations topic: DSPy — replacing manual prompt engineering with programmatic optimization!