Production systems must handle failures gracefully. This guide shows you how to implement resilient API calls.
Coming from Software Engineering? This is resilience engineering — you've implemented all of these patterns before. Exponential backoff with jitter for retries, circuit breakers (Hystrix/Resilience4j pattern) to prevent cascade failures, rate limiting to stay within quotas. The only new concept is token-based rate limiting, where APIs limit you by tokens-per-minute (a token is roughly a chunk of a word — the unit LLMs meter and bill by, about 3/4 of a word on average) rather than requests-per-minute. Your existing retry logic, backoff strategies, and circuit breaker implementations transfer directly.
Why Resilience Matters
Common issues:
- Rate limits: API says "slow down"
- Transient errors: Network blips
- Service unavailable: Backend overloaded
- Timeouts: Response too slow
Basic Retry with Backoff
# script_id: day_090_rate_limits_backoffs/basic_retry_with_backoff
import time
import random
from typing import Callable, Any
def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> Any:
"""
Retry a function with exponential backoff.
Args:
func: Function to retry
max_retries: Maximum retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay between retries
exponential_base: Multiplier for each retry
jitter: Add randomness so clients that all failed at the same instant don't all retry at the same instant (a thundering herd) and re-overload the API.
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return func()
except Exception as e:
last_exception = e
if attempt == max_retries:
break
# Calculate delay
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# Add jitter
if jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
raise last_exception
# Usage
from openai import OpenAI
client = OpenAI() # openai v1: call methods on a client instance, not the module
def call_api():
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}],
)
if __name__ == "__main__":
result = retry_with_backoff(call_api)
Decorator-Based Retry
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import functools
import time
import random
from typing import Tuple, Type
def retry(
max_retries: int = 3,
base_delay: float = 1.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
delay = base_delay * (2 ** attempt) * (0.5 + random.random())
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage
from openai import OpenAI, RateLimitError, APIError
client = OpenAI()
@retry(max_retries=3, exceptions=(RateLimitError, APIError))
def call_openai(prompt: str) -> str:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
if __name__ == "__main__":
result = call_openai("Hello!")
Rate Limiter
Proactively limit your request rate:
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import time
from collections import deque
from threading import Lock
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, requests_per_minute: int = 60):
self.rpm = requests_per_minute
self.interval = 60.0 / requests_per_minute
self.timestamps = deque()
self.lock = Lock()
def acquire(self):
"""Wait until a request slot is available."""
with self.lock:
now = time.time()
# Remove old timestamps
while self.timestamps and now - self.timestamps[0] > 60:
self.timestamps.popleft()
# Check if we're at the limit
if len(self.timestamps) >= self.rpm:
# Wait until oldest request expires
sleep_time = 60 - (now - self.timestamps[0])
if sleep_time > 0:
print(f"Rate limit: waiting {sleep_time:.2f}s")
time.sleep(sleep_time)
self.timestamps.popleft()
self.timestamps.append(time.time())
def __call__(self, func):
"""Use as decorator."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
self.acquire()
return func(*args, **kwargs)
return wrapper
# Usage
limiter = RateLimiter(requests_per_minute=20)
@limiter
def call_api():
return "ok" # stand-in for a real client.chat.completions.create(...) call
# Or manual
limiter.acquire()
result = call_api()
Circuit Breaker
Stop calling failing services:
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""
Circuit breaker pattern implementation.
States:
- CLOSED: Normal operation, requests go through
- OPEN: Service failing, reject requests immediately
- HALF_OPEN: Testing recovery, allow limited requests
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
expected_exceptions: tuple = (Exception,)
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exceptions = expected_exceptions
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.lock = Lock()
def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker."""
with self.lock:
if self.state == CircuitState.OPEN:
# Check if recovery timeout passed
if time.time() - self.last_failure_time > self.recovery_timeout:
print("Circuit half-open, testing...")
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen("Circuit breaker is open")
try:
result = func(*args, **kwargs)
with self.lock:
# Success - reset failures
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
print("Circuit closed (recovered)")
self.state = CircuitState.CLOSED
return result
except self.expected_exceptions as e:
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
print(f"Circuit opened after {self.failure_count} failures")
self.state = CircuitState.OPEN
raise
def __call__(self, func):
"""Use as decorator."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
return self.call(func, *args, **kwargs)
return wrapper
class CircuitBreakerOpen(Exception):
pass
# Usage
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=60)
def fallback_response():
return {"ok": False, "cached": True}
@circuit
def call_external_api():
return {"ok": True} # stand-in for a real API call
# Manual usage
try:
result = circuit.call(call_external_api)
except CircuitBreakerOpen:
print("Service unavailable, using fallback")
result = fallback_response()
Provider Fallback
The resilience patterns above protect a single provider. For an outage, the next layer is a fallback chain: catch a provider-level failure (or open circuit) and retry the request on a different provider or model, degrading to a cheaper/static response only as a last resort. It's the circuit breaker above plus a try-the-next-one loop — keep a small ordered list of providers and walk it on failure, tracking health so you don't keep hammering a dead one.
Combined Resilience
Put it all together:
# script_id: day_090_rate_limits_backoffs/resilience_toolkit
from dataclasses import dataclass
from typing import Optional, Callable
import time
import random
@dataclass
class ResilienceConfig:
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
requests_per_minute: int = 60
circuit_failure_threshold: int = 5
circuit_recovery_timeout: float = 30.0
class ResilientClient:
"""API client with rate limiting, retries, and circuit breaker."""
def __init__(self, config: ResilienceConfig = None):
self.config = config or ResilienceConfig()
self.rate_limiter = RateLimiter(self.config.requests_per_minute)
self.circuit_breaker = CircuitBreaker(
failure_threshold=self.config.circuit_failure_threshold,
recovery_timeout=self.config.circuit_recovery_timeout
)
def call(self, func: Callable, *args, **kwargs):
"""Make a resilient API call."""
last_exception = None
for attempt in range(self.config.max_retries + 1):
try:
# Rate limiting
self.rate_limiter.acquire()
# Circuit breaker
return self.circuit_breaker.call(func, *args, **kwargs)
except CircuitBreakerOpen:
# Don't retry if circuit is open
raise
except Exception as e:
last_exception = e
if attempt == self.config.max_retries:
break
# Exponential backoff
delay = min(
self.config.base_delay * (2 ** attempt),
self.config.max_delay
)
delay *= (0.5 + random.random()) # Jitter
print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s")
time.sleep(delay)
raise last_exception
# Usage
client = ResilientClient(ResilienceConfig(
max_retries=3,
requests_per_minute=20,
circuit_failure_threshold=5
))
def make_api_call():
return "ok" # stand-in for a real client.chat.completions.create(...) call
result = client.call(make_api_call)
Using Tenacity Library
You have now implemented backoff three ways by hand to understand the mechanics; in production, reach for the tenacity library instead:
pip install tenacity
# script_id: day_090_rate_limits_backoffs/tenacity_retry
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from openai import OpenAI, RateLimitError, APIError
client = OpenAI()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type((RateLimitError, APIError)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def call_openai(prompt: str) -> str:
"""Call OpenAI with automatic retry."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# More complex configuration
from tenacity import retry, RetryError
from openai import AsyncOpenAI
aclient = AsyncOpenAI()
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception),
reraise=True
)
async def async_api_call():
"""Async API call with retry."""
response = await aclient.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}]
)
return response.choices[0].message.content
Token Budget Management
In production, you need to prevent cost explosions. Token budgets enforce limits per user, per organization, or per time window.
# script_id: day_090_rate_limits_backoffs/token_budget
import time
from dataclasses import dataclass, field
@dataclass
class TokenBudget:
"""Track and enforce token usage limits."""
max_tokens_per_hour: int = 100_000
max_tokens_per_request: int = 10_000
usage_log: list = field(default_factory=list)
def check_budget(self, estimated_tokens: int) -> bool:
"""Returns True if the request is within budget."""
if estimated_tokens > self.max_tokens_per_request:
return False
# Clean old entries (older than 1 hour)
cutoff = time.time() - 3600
self.usage_log = [(t, tokens) for t, tokens in self.usage_log if t > cutoff]
# Check hourly budget
hourly_usage = sum(tokens for _, tokens in self.usage_log)
return (hourly_usage + estimated_tokens) <= self.max_tokens_per_hour
def record_usage(self, tokens_used: int):
"""Record actual token usage after a request."""
self.usage_log.append((time.time(), tokens_used))
@property
def remaining_budget(self) -> int:
cutoff = time.time() - 3600
self.usage_log = [(t, tokens) for t, tokens in self.usage_log if t > cutoff]
used = sum(tokens for _, tokens in self.usage_log)
return max(0, self.max_tokens_per_hour - used)
# Per-user budgets
user_budgets: dict[str, TokenBudget] = {}
def get_user_budget(user_id: str) -> TokenBudget:
if user_id not in user_budgets:
user_budgets[user_id] = TokenBudget(
max_tokens_per_hour=50_000, # Free tier
max_tokens_per_request=4_000
)
return user_budgets[user_id]
class _Resp:
class usage:
total_tokens = 0
def call_llm(prompt: str):
# stand-in for a real client.chat.completions.create(...) call
return _Resp()
# Usage in your API handler
def handle_request(user_id: str, prompt: str):
budget = get_user_budget(user_id)
estimated = int(len(prompt.split()) * 1.3) # Rough input-only estimate (~1.3 tokens/word); the reply is unknown until the call returns, so add headroom and reconcile via record_usage() below.
if not budget.check_budget(estimated):
raise Exception(f"Token budget exceeded. Remaining: {budget.remaining_budget}")
response = call_llm(prompt)
budget.record_usage(response.usage.total_tokens)
return response
Coming from Software Engineering? Token budgets are API rate limiting measured in tokens instead of requests. The same patterns — sliding window, per-user quotas, tiered plans — apply. The key difference: a single LLM request can vary from 100 to 100,000 tokens — the same endpoint handles a one-line question and a request to summarize a 50-page document, and you pay for the size of the text in and out, not the number of calls — so request-count limits alone are insufficient.
Checkpoint
Run the basic_retry_with_backoff (or tenacity_retry) example against a function that raises a simulated 429 and confirm the base wait roughly doubles each attempt (1s, 2s, 4s…) before jitter is applied — the printed values will scatter between about half and one-and-a-half times that base, which is expected. To see the clean doubling, run with jitter=False. If it retries instantly with no delay, check that you're actually sleep-ing the computed backoff and that jitter isn't collapsing the interval to near-zero.
Summary
Quick Reference
# script_id: day_090_rate_limits_backoffs/quick_reference
# fragment
# Simple retry
@retry(max_retries=3, base_delay=1.0)
def api_call():
...
# Rate limiter
limiter = RateLimiter(requests_per_minute=60)
limiter.acquire()
# Circuit breaker
circuit = CircuitBreaker(failure_threshold=5)
result = circuit.call(api_call)
# Tenacity
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
def resilient_call():
...
Exercises
- Exponential backoff. Write a
retry_with_backoffwrapper that retries a call on429/RateLimitErrorwith delays of 1s, 2s, 4s, 8s and gives up after 5 tries. - Add jitter. Improve it by adding random jitter to each delay and explain in one line why a thundering herd makes fixed delays dangerous.
- Respect the server. When the API returns a
Retry-Afterheader, sleep for that exact duration instead of your computed backoff. - Per-user token budget. Implement a sliding-window token budget that blocks a user once they exceed N tokens/hour, and returns their remaining allowance.
Solutions (approaches)
for i in range(5): try: return fn() except RateLimitError: time.sleep(2 ** i); raise after the loop.delay = (2 ** i) + random.uniform(0, 1); without jitter, all clients retry at the same instant and re-collide.wait = int(e.response.headers.get("Retry-After", 2 ** i)); time.sleep(wait).- Keep
(timestamp, tokens)entries; drop entries older than 3600s;remaining = max_per_hour - sum(tokens).
What's Next?
Now let's cut cost and latency with Semantic Caching — reusing answers for questions that mean the same thing, not just exact-match strings.