Production systems must handle failures gracefully. This guide shows you how to implement resilient API calls.
Coming from Software Engineering? This is resilience engineering — you've implemented all of these patterns before. Exponential backoff with jitter for retries, circuit breakers (Hystrix/Resilience4j pattern) to prevent cascade failures, rate limiting to stay within quotas. The only new concept is token-based rate limiting, where APIs limit you by tokens-per-minute rather than requests-per-minute. Your existing retry logic, backoff strategies, and circuit breaker implementations transfer directly.
Why Resilience Matters
Common issues:
- Rate limits: API says "slow down"
- Transient errors: Network blips
- Service unavailable: Backend overloaded
- Timeouts: Response too slow
Basic Retry with Backoff
# script_id: day_090_rate_limits_backoffs/basic_retry_with_backoff
import time
import random
from typing import Callable, Any
def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> Any:
"""
Retry a function with exponential backoff.
Args:
func: Function to retry
max_retries: Maximum retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay between retries
exponential_base: Multiplier for each retry
jitter: Add randomness to prevent thundering herd
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return func()
except Exception as e:
last_exception = e
if attempt == max_retries:
break
# Calculate delay
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# Add jitter
if jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
raise last_exception
# Usage
def call_api():
response = openai.chat.completions.create(...)
return response
result = retry_with_backoff(call_api)
Decorator-Based Retry
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import functools
import time
import random
from typing import Tuple, Type
def retry(
max_retries: int = 3,
base_delay: float = 1.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
delay = base_delay * (2 ** attempt) * (0.5 + random.random())
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage
@retry(max_retries=3, exceptions=(RateLimitError, APIError))
def call_openai(prompt: str) -> str:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
result = call_openai("Hello!")
Rate Limiter
Proactively limit your request rate:
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import time
from collections import deque
from threading import Lock
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, requests_per_minute: int = 60):
self.rpm = requests_per_minute
self.interval = 60.0 / requests_per_minute
self.timestamps = deque()
self.lock = Lock()
def acquire(self):
"""Wait until a request slot is available."""
with self.lock:
now = time.time()
# Remove old timestamps
while self.timestamps and now - self.timestamps[0] > 60:
self.timestamps.popleft()
# Check if we're at the limit
if len(self.timestamps) >= self.rpm:
# Wait until oldest request expires
sleep_time = 60 - (now - self.timestamps[0])
if sleep_time > 0:
print(f"Rate limit: waiting {sleep_time:.2f}s")
time.sleep(sleep_time)
self.timestamps.popleft()
self.timestamps.append(time.time())
def __call__(self, func):
"""Use as decorator."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
self.acquire()
return func(*args, **kwargs)
return wrapper
# Usage
limiter = RateLimiter(requests_per_minute=20)
@limiter
def call_api():
return client.chat.completions.create(...)
# Or manual
limiter.acquire()
result = call_api()
Circuit Breaker
Stop calling failing services:
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""
Circuit breaker pattern implementation.
States:
- CLOSED: Normal operation, requests go through
- OPEN: Service failing, reject requests immediately
- HALF_OPEN: Testing recovery, allow limited requests
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
expected_exceptions: tuple = (Exception,)
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exceptions = expected_exceptions
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.lock = Lock()
def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker."""
with self.lock:
if self.state == CircuitState.OPEN:
# Check if recovery timeout passed
if time.time() - self.last_failure_time > self.recovery_timeout:
print("Circuit half-open, testing...")
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen("Circuit breaker is open")
try:
result = func(*args, **kwargs)
with self.lock:
# Success - reset failures
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
print("Circuit closed (recovered)")
self.state = CircuitState.CLOSED
return result
except self.expected_exceptions as e:
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
print(f"Circuit opened after {self.failure_count} failures")
self.state = CircuitState.OPEN
raise
def __call__(self, func):
"""Use as decorator."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
return self.call(func, *args, **kwargs)
return wrapper
class CircuitBreakerOpen(Exception):
pass
# Usage
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=60)
@circuit
def call_external_api():
return requests.get("https://api.example.com")
# Manual usage
try:
result = circuit.call(call_external_api)
except CircuitBreakerOpen:
print("Service unavailable, using fallback")
result = fallback_response()
Combined Resilience
Put it all together:
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
from dataclasses import dataclass
from typing import Optional, Callable
import time
import random
@dataclass
class ResilienceConfig:
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
requests_per_minute: int = 60
circuit_failure_threshold: int = 5
circuit_recovery_timeout: float = 30.0
class ResilientClient:
"""API client with rate limiting, retries, and circuit breaker."""
def __init__(self, config: ResilienceConfig = None):
self.config = config or ResilienceConfig()
self.rate_limiter = RateLimiter(self.config.requests_per_minute)
self.circuit_breaker = CircuitBreaker(
failure_threshold=self.config.circuit_failure_threshold,
recovery_timeout=self.config.circuit_recovery_timeout
)
def call(self, func: Callable, *args, **kwargs):
"""Make a resilient API call."""
last_exception = None
for attempt in range(self.config.max_retries + 1):
try:
# Rate limiting
self.rate_limiter.acquire()
# Circuit breaker
return self.circuit_breaker.call(func, *args, **kwargs)
except CircuitBreakerOpen:
# Don't retry if circuit is open
raise
except Exception as e:
last_exception = e
if attempt == self.config.max_retries:
break
# Exponential backoff
delay = min(
self.config.base_delay * (2 ** attempt),
self.config.max_delay
)
delay *= (0.5 + random.random()) # Jitter
print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s")
time.sleep(delay)
raise last_exception
# Usage
client = ResilientClient(ResilienceConfig(
max_retries=3,
requests_per_minute=20,
circuit_failure_threshold=5
))
def make_api_call():
return openai_client.chat.completions.create(...)
result = client.call(make_api_call)
Using Tenacity Library
For production, use the tenacity library:
pip install tenacity
# script_id: day_090_rate_limits_backoffs/tenacity_retry
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type((RateLimitError, APIError)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def call_openai(prompt: str) -> str:
"""Call OpenAI with automatic retry."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# More complex configuration
from tenacity import retry, RetryError
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception),
reraise=True
)
async def async_api_call():
"""Async API call with retry."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
Summary
Quick Reference
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
# Simple retry
@retry(max_retries=3, base_delay=1.0)
def api_call():
...
# Rate limiter
limiter = RateLimiter(requests_per_minute=60)
limiter.acquire()
# Circuit breaker
circuit = CircuitBreaker(failure_threshold=5)
result = circuit.call(api_call)
# Tenacity
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
def resilient_call():
...
Token Budget Management
In production, you need to prevent cost explosions. Token budgets enforce limits per user, per organization, or per time window.
# script_id: day_090_rate_limits_backoffs/token_budget
import time
from dataclasses import dataclass, field
@dataclass
class TokenBudget:
"""Track and enforce token usage limits."""
max_tokens_per_hour: int = 100_000
max_tokens_per_request: int = 10_000
usage_log: list = field(default_factory=list)
def check_budget(self, estimated_tokens: int) -> bool:
"""Returns True if the request is within budget."""
if estimated_tokens > self.max_tokens_per_request:
return False
# Clean old entries (older than 1 hour)
cutoff = time.time() - 3600
self.usage_log = [(t, tokens) for t, tokens in self.usage_log if t > cutoff]
# Check hourly budget
hourly_usage = sum(tokens for _, tokens in self.usage_log)
return (hourly_usage + estimated_tokens) <= self.max_tokens_per_hour
def record_usage(self, tokens_used: int):
"""Record actual token usage after a request."""
self.usage_log.append((time.time(), tokens_used))
@property
def remaining_budget(self) -> int:
cutoff = time.time() - 3600
self.usage_log = [(t, tokens) for t, tokens in self.usage_log if t > cutoff]
used = sum(tokens for _, tokens in self.usage_log)
return max(0, self.max_tokens_per_hour - used)
# Per-user budgets
user_budgets: dict[str, TokenBudget] = {}
def get_user_budget(user_id: str) -> TokenBudget:
if user_id not in user_budgets:
user_budgets[user_id] = TokenBudget(
max_tokens_per_hour=50_000, # Free tier
max_tokens_per_request=4_000
)
return user_budgets[user_id]
# Usage in your API handler
def handle_request(user_id: str, prompt: str):
budget = get_user_budget(user_id)
estimated = len(prompt.split()) * 2 # Rough estimate
if not budget.check_budget(estimated):
raise Exception(f"Token budget exceeded. Remaining: {budget.remaining_budget}")
response = call_llm(prompt)
budget.record_usage(response.usage.total_tokens)
return response
Coming from Software Engineering? Token budgets are API rate limiting measured in tokens instead of requests. The same patterns — sliding window, per-user quotas, tiered plans — apply. The key difference: a single LLM request can vary from 100 to 100,000 tokens, so request-count limits alone are insufficient.
What's Next?
Now let's deploy to the cloud with AWS, GCP, or platforms like Render!