Phase 7Production Deployment·8 min read

Rate Limits, Exponential Backoffs & Circuit Breakers

Phase 7 of 8

Production systems must handle failures gracefully. This guide shows you how to implement resilient API calls.

Coming from Software Engineering? This is resilience engineering — you've implemented all of these patterns before. Exponential backoff with jitter for retries, circuit breakers (Hystrix/Resilience4j pattern) to prevent cascade failures, rate limiting to stay within quotas. The only new concept is token-based rate limiting, where APIs limit you by tokens-per-minute rather than requests-per-minute. Your existing retry logic, backoff strategies, and circuit breaker implementations transfer directly.


Why Resilience Matters

Common issues:

  • Rate limits: API says "slow down"
  • Transient errors: Network blips
  • Service unavailable: Backend overloaded
  • Timeouts: Response too slow

Basic Retry with Backoff

# script_id: day_090_rate_limits_backoffs/basic_retry_with_backoff
import time
import random
from typing import Callable, Any

def retry_with_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
) -> Any:
    """
    Retry a function with exponential backoff.

    Args:
        func: Function to retry
        max_retries: Maximum retry attempts
        base_delay: Initial delay in seconds
        max_delay: Maximum delay between retries
        exponential_base: Multiplier for each retry
        jitter: Add randomness to prevent thundering herd
    """

    last_exception = None

    for attempt in range(max_retries + 1):
        try:
            return func()
        except Exception as e:
            last_exception = e

            if attempt == max_retries:
                break

            # Calculate delay
            delay = min(base_delay * (exponential_base ** attempt), max_delay)

            # Add jitter
            if jitter:
                delay = delay * (0.5 + random.random())

            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
            time.sleep(delay)

    raise last_exception

# Usage
def call_api():
    response = openai.chat.completions.create(...)
    return response

result = retry_with_backoff(call_api)

Decorator-Based Retry

# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import functools
import time
import random
from typing import Tuple, Type

def retry(
    max_retries: int = 3,
    base_delay: float = 1.0,
    exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
    """Decorator for retry with exponential backoff."""

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e

                    if attempt == max_retries:
                        break

                    delay = base_delay * (2 ** attempt) * (0.5 + random.random())
                    print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s")
                    time.sleep(delay)

            raise last_exception
        return wrapper
    return decorator

# Usage
@retry(max_retries=3, exceptions=(RateLimitError, APIError))
def call_openai(prompt: str) -> str:
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

result = call_openai("Hello!")

Rate Limiter

Proactively limit your request rate:

# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import time
from collections import deque
from threading import Lock

class RateLimiter:
    """Token bucket rate limiter."""

    def __init__(self, requests_per_minute: int = 60):
        self.rpm = requests_per_minute
        self.interval = 60.0 / requests_per_minute
        self.timestamps = deque()
        self.lock = Lock()

    def acquire(self):
        """Wait until a request slot is available."""
        with self.lock:
            now = time.time()

            # Remove old timestamps
            while self.timestamps and now - self.timestamps[0] > 60:
                self.timestamps.popleft()

            # Check if we're at the limit
            if len(self.timestamps) >= self.rpm:
                # Wait until oldest request expires
                sleep_time = 60 - (now - self.timestamps[0])
                if sleep_time > 0:
                    print(f"Rate limit: waiting {sleep_time:.2f}s")
                    time.sleep(sleep_time)
                    self.timestamps.popleft()

            self.timestamps.append(time.time())

    def __call__(self, func):
        """Use as decorator."""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            self.acquire()
            return func(*args, **kwargs)
        return wrapper

# Usage
limiter = RateLimiter(requests_per_minute=20)

@limiter
def call_api():
    return client.chat.completions.create(...)

# Or manual
limiter.acquire()
result = call_api()

Circuit Breaker

Stop calling failing services:

# script_id: day_090_rate_limits_backoffs/resilience_toolkit
import time
from enum import Enum
from threading import Lock

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    """
    Circuit breaker pattern implementation.

    States:
    - CLOSED: Normal operation, requests go through
    - OPEN: Service failing, reject requests immediately
    - HALF_OPEN: Testing recovery, allow limited requests
    """

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        expected_exceptions: tuple = (Exception,)
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exceptions = expected_exceptions

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.lock = Lock()

    def call(self, func, *args, **kwargs):
        """Execute function through circuit breaker."""

        with self.lock:
            if self.state == CircuitState.OPEN:
                # Check if recovery timeout passed
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    print("Circuit half-open, testing...")
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise CircuitBreakerOpen("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)

            with self.lock:
                # Success - reset failures
                self.failure_count = 0
                if self.state == CircuitState.HALF_OPEN:
                    print("Circuit closed (recovered)")
                    self.state = CircuitState.CLOSED

            return result

        except self.expected_exceptions as e:
            with self.lock:
                self.failure_count += 1
                self.last_failure_time = time.time()

                if self.failure_count >= self.failure_threshold:
                    print(f"Circuit opened after {self.failure_count} failures")
                    self.state = CircuitState.OPEN

            raise

    def __call__(self, func):
        """Use as decorator."""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            return self.call(func, *args, **kwargs)
        return wrapper

class CircuitBreakerOpen(Exception):
    pass

# Usage
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=60)

@circuit
def call_external_api():
    return requests.get("https://api.example.com")

# Manual usage
try:
    result = circuit.call(call_external_api)
except CircuitBreakerOpen:
    print("Service unavailable, using fallback")
    result = fallback_response()

Combined Resilience

Put it all together:

# script_id: day_090_rate_limits_backoffs/resilience_toolkit
from dataclasses import dataclass
from typing import Optional, Callable
import time
import random

@dataclass
class ResilienceConfig:
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    requests_per_minute: int = 60
    circuit_failure_threshold: int = 5
    circuit_recovery_timeout: float = 30.0

class ResilientClient:
    """API client with rate limiting, retries, and circuit breaker."""

    def __init__(self, config: ResilienceConfig = None):
        self.config = config or ResilienceConfig()
        self.rate_limiter = RateLimiter(self.config.requests_per_minute)
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=self.config.circuit_failure_threshold,
            recovery_timeout=self.config.circuit_recovery_timeout
        )

    def call(self, func: Callable, *args, **kwargs):
        """Make a resilient API call."""

        last_exception = None

        for attempt in range(self.config.max_retries + 1):
            try:
                # Rate limiting
                self.rate_limiter.acquire()

                # Circuit breaker
                return self.circuit_breaker.call(func, *args, **kwargs)

            except CircuitBreakerOpen:
                # Don't retry if circuit is open
                raise

            except Exception as e:
                last_exception = e

                if attempt == self.config.max_retries:
                    break

                # Exponential backoff
                delay = min(
                    self.config.base_delay * (2 ** attempt),
                    self.config.max_delay
                )
                delay *= (0.5 + random.random())  # Jitter

                print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s")
                time.sleep(delay)

        raise last_exception

# Usage
client = ResilientClient(ResilienceConfig(
    max_retries=3,
    requests_per_minute=20,
    circuit_failure_threshold=5
))

def make_api_call():
    return openai_client.chat.completions.create(...)

result = client.call(make_api_call)

Using Tenacity Library

For production, use the tenacity library:

pip install tenacity
# script_id: day_090_rate_limits_backoffs/tenacity_retry
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log
)
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    retry=retry_if_exception_type((RateLimitError, APIError)),
    before_sleep=before_sleep_log(logger, logging.WARNING)
)
def call_openai(prompt: str) -> str:
    """Call OpenAI with automatic retry."""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# More complex configuration
from tenacity import retry, RetryError

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=4, max=60),
    retry=retry_if_exception_type(Exception),
    reraise=True
)
async def async_api_call():
    """Async API call with retry."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

Summary


Quick Reference

# script_id: day_090_rate_limits_backoffs/resilience_toolkit
# Simple retry
@retry(max_retries=3, base_delay=1.0)
def api_call():
    ...

# Rate limiter
limiter = RateLimiter(requests_per_minute=60)
limiter.acquire()

# Circuit breaker
circuit = CircuitBreaker(failure_threshold=5)
result = circuit.call(api_call)

# Tenacity
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential())
def resilient_call():
    ...

Token Budget Management

In production, you need to prevent cost explosions. Token budgets enforce limits per user, per organization, or per time window.

# script_id: day_090_rate_limits_backoffs/token_budget
import time
from dataclasses import dataclass, field

@dataclass
class TokenBudget:
    """Track and enforce token usage limits."""
    max_tokens_per_hour: int = 100_000
    max_tokens_per_request: int = 10_000
    usage_log: list = field(default_factory=list)

    def check_budget(self, estimated_tokens: int) -> bool:
        """Returns True if the request is within budget."""
        if estimated_tokens > self.max_tokens_per_request:
            return False

        # Clean old entries (older than 1 hour)
        cutoff = time.time() - 3600
        self.usage_log = [(t, tokens) for t, tokens in self.usage_log if t > cutoff]

        # Check hourly budget
        hourly_usage = sum(tokens for _, tokens in self.usage_log)
        return (hourly_usage + estimated_tokens) <= self.max_tokens_per_hour

    def record_usage(self, tokens_used: int):
        """Record actual token usage after a request."""
        self.usage_log.append((time.time(), tokens_used))

    @property
    def remaining_budget(self) -> int:
        cutoff = time.time() - 3600
        self.usage_log = [(t, tokens) for t, tokens in self.usage_log if t > cutoff]
        used = sum(tokens for _, tokens in self.usage_log)
        return max(0, self.max_tokens_per_hour - used)


# Per-user budgets
user_budgets: dict[str, TokenBudget] = {}

def get_user_budget(user_id: str) -> TokenBudget:
    if user_id not in user_budgets:
        user_budgets[user_id] = TokenBudget(
            max_tokens_per_hour=50_000,   # Free tier
            max_tokens_per_request=4_000
        )
    return user_budgets[user_id]

# Usage in your API handler
def handle_request(user_id: str, prompt: str):
    budget = get_user_budget(user_id)
    estimated = len(prompt.split()) * 2  # Rough estimate

    if not budget.check_budget(estimated):
        raise Exception(f"Token budget exceeded. Remaining: {budget.remaining_budget}")

    response = call_llm(prompt)
    budget.record_usage(response.usage.total_tokens)
    return response

Coming from Software Engineering? Token budgets are API rate limiting measured in tokens instead of requests. The same patterns — sliding window, per-user quotas, tiered plans — apply. The key difference: a single LLM request can vary from 100 to 100,000 tokens, so request-count limits alone are insufficient.


What's Next?

Now let's deploy to the cloud with AWS, GCP, or platforms like Render!