Phase 3Single Agent·11 min read

Checkpoints & Persistence

Phase 3 of 8

Agents need memory! In this guide, you'll learn to save agent state, implement checkpoints, and store conversation history in databases.

Coming from Software Engineering? Agent checkpoints are like database transactions and savepoints. If you've used database savepoints for rollback, Git commits for version history, or Redux DevTools for time-travel debugging, agent checkpointing is the same concept — save state so you can restore, replay, or debug later.


Why Memory Matters

Types of memory:

  • Conversation Memory: Recent messages
  • Entity Memory: Facts about people/things
  • Summary Memory: Compressed history
  • Persistent Memory: Saved across sessions

Conversation Memory

Simple In-Memory Storage

# script_id: day_044_checkpoints_persistence/conversation_memory
from openai import OpenAI
from collections import deque

client = OpenAI()

class ConversationMemory:
    """Simple sliding window memory."""

    def __init__(self, max_messages: int = 20):
        self.messages = deque(maxlen=max_messages)
        self.system_prompt = "You are a helpful assistant."

    def add_user_message(self, content: str):
        self.messages.append({"role": "user", "content": content})

    def add_assistant_message(self, content: str):
        self.messages.append({"role": "assistant", "content": content})

    def get_messages(self) -> list:
        return [{"role": "system", "content": self.system_prompt}] + list(self.messages)

    def chat(self, user_input: str) -> str:
        self.add_user_message(user_input)

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=self.get_messages()
        )

        assistant_message = response.choices[0].message.content
        self.add_assistant_message(assistant_message)

        return assistant_message

# Usage
memory = ConversationMemory(max_messages=10)
print(memory.chat("My name is Alice"))
print(memory.chat("What's my name?"))  # Remembers!

Summary Memory

For long conversations, summarize old messages:

# script_id: day_044_checkpoints_persistence/summary_memory
class SummaryMemory:
    """Memory that summarizes old conversations."""

    def __init__(self, max_recent: int = 6, summarize_after: int = 10):
        self.recent_messages = []
        self.summary = ""
        self.max_recent = max_recent
        self.summarize_after = summarize_after
        self.message_count = 0

    def add_message(self, role: str, content: str):
        self.recent_messages.append({"role": role, "content": content})
        self.message_count += 1

        if self.message_count >= self.summarize_after:
            self._summarize()

    def _summarize(self):
        """Summarize old messages."""
        if len(self.recent_messages) <= self.max_recent:
            return

        # Get messages to summarize
        to_summarize = self.recent_messages[:-self.max_recent]
        self.recent_messages = self.recent_messages[-self.max_recent:]

        # Generate summary
        summary_prompt = f"""Summarize this conversation history concisely:

Previous summary: {self.summary}

New messages:
{self._format_messages(to_summarize)}

Provide a brief summary of key points and facts mentioned."""

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": summary_prompt}],
            max_tokens=200
        )

        self.summary = response.choices[0].message.content
        self.message_count = len(self.recent_messages)

    def _format_messages(self, messages: list) -> str:
        return "\n".join([f"{m['role']}: {m['content']}" for m in messages])

    def get_context(self) -> str:
        context = ""
        if self.summary:
            context = f"Previous conversation summary:\n{self.summary}\n\n"
        context += "Recent messages:\n" + self._format_messages(self.recent_messages)
        return context

LangGraph Checkpoints

LangGraph provides built-in checkpointing:

# script_id: day_044_checkpoints_persistence/langgraph_checkpoints
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver  # or: pip install langgraph-checkpoint-sqlite
from typing import TypedDict, Annotated
from operator import add

# Define state
class AgentState(TypedDict):
    messages: Annotated[list, add]
    step_count: int

# Create checkpointer
checkpointer = MemorySaver()  # For production, use PostgresSaver or SqliteSaver

# Build graph
def agent_node(state: AgentState) -> dict:
    # Your agent logic here
    return {
        "messages": ["Processed step"],
        "step_count": state["step_count"] + 1
    }

workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.set_entry_point("agent")
workflow.add_edge("agent", END)

# Compile with checkpointer
app = workflow.compile(checkpointer=checkpointer)

# Run with thread ID for persistence
config = {"configurable": {"thread_id": "user-123"}}

# First run
result1 = app.invoke(
    {"messages": ["Hello"], "step_count": 0},
    config=config
)

# Later - resume from checkpoint!
result2 = app.invoke(
    {"messages": ["Continue"], "step_count": 0},
    config=config
)
# result2 will continue from where result1 left off

Time-Travel Debugging

# script_id: day_044_checkpoints_persistence/langgraph_checkpoints
from langgraph.checkpoint.memory import MemorySaver  # or: pip install langgraph-checkpoint-sqlite

checkpointer = MemorySaver()  # For production, use PostgresSaver or SqliteSaver

# Get all checkpoints for a thread
thread_id = "user-123"
checkpoints = list(app.get_state_history({"configurable": {"thread_id": thread_id}}))

print(f"Found {len(checkpoints)} checkpoints")

# Get a specific checkpoint
for cp in checkpoints:
    print(f"Checkpoint: {cp.config}")
    print(f"State: {cp.values}")

# Replay from a specific checkpoint
old_config = checkpoints[2].config  # Go back to checkpoint 2
result = app.invoke({"messages": ["Retry from here"]}, config=old_config)

Quick Recap

Concept What It Does When to Use
Conversation Memory Stores recent messages in a sliding window Every chatbot — keeps context without unbounded growth
Summary Memory Compresses old messages into summaries Long conversations where full history exceeds context window
LangGraph Checkpoints Saves full graph state at each step Agents that need to resume, retry, or debug
Time-Travel Debugging Replays from any previous checkpoint Debugging agent behavior — "what went wrong at step 3?"

Try It Yourself

  1. Build an entity memory system: Extend ConversationMemory to extract and store facts about entities (people, places, topics) mentioned in conversation. Use an LLM to extract entities from each message.

  2. Implement checkpoint cleanup: The SQLite checkpointer stores every checkpoint forever. Write a cleanup function that keeps only the last N checkpoints per thread to manage storage.

  3. Compare memory strategies: Run the same 20-message conversation with sliding window (max 5), sliding window (max 10), and summary memory. Compare the quality of the agent's responses at message 20.


What's Next

Now that you can save and restore agent state in memory, let's make it durable. In Part 2, you'll persist checkpoints to SQLite and PostgreSQL databases — so your agents survive restarts and can serve multiple users.



Database Persistence

SQLite Storage

# script_id: day_044_checkpoints_persistence/sqlite_conversation_store
import sqlite3
import json
from datetime import datetime

class ConversationStore:
    """Store conversations in SQLite."""

    def __init__(self, db_path: str = "conversations.db"):
        self.conn = sqlite3.connect(db_path)
        self._create_tables()

    def _create_tables(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS conversations (
                id TEXT PRIMARY KEY,
                created_at TIMESTAMP,
                updated_at TIMESTAMP,
                metadata TEXT
            )
        """)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                conversation_id TEXT,
                role TEXT,
                content TEXT,
                timestamp TIMESTAMP,
                FOREIGN KEY (conversation_id) REFERENCES conversations(id)
            )
        """)
        self.conn.commit()

    def create_conversation(self, conversation_id: str, metadata: dict = None):
        now = datetime.now()
        self.conn.execute(
            "INSERT INTO conversations (id, created_at, updated_at, metadata) VALUES (?, ?, ?, ?)",
            (conversation_id, now, now, json.dumps(metadata or {}))
        )
        self.conn.commit()

    def add_message(self, conversation_id: str, role: str, content: str):
        now = datetime.now()
        self.conn.execute(
            "INSERT INTO messages (conversation_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
            (conversation_id, role, content, now)
        )
        self.conn.execute(
            "UPDATE conversations SET updated_at = ? WHERE id = ?",
            (now, conversation_id)
        )
        self.conn.commit()

    def get_messages(self, conversation_id: str, limit: int = 50) -> list:
        cursor = self.conn.execute(
            "SELECT role, content FROM messages WHERE conversation_id = ? ORDER BY timestamp DESC LIMIT ?",
            (conversation_id, limit)
        )
        messages = [{"role": row[0], "content": row[1]} for row in cursor.fetchall()]
        return list(reversed(messages))

    def get_conversation(self, conversation_id: str) -> dict:
        cursor = self.conn.execute(
            "SELECT id, created_at, metadata FROM conversations WHERE id = ?",
            (conversation_id,)
        )
        row = cursor.fetchone()
        if row:
            return {
                "id": row[0],
                "created_at": row[1],
                "metadata": json.loads(row[2]),
                "messages": self.get_messages(conversation_id)
            }
        return None

# Usage
store = ConversationStore()

# Create conversation
store.create_conversation("conv-123", {"user_id": "user-456"})

# Add messages
store.add_message("conv-123", "user", "Hello!")
store.add_message("conv-123", "assistant", "Hi there!")

# Retrieve
conversation = store.get_conversation("conv-123")
print(conversation)

PostgreSQL for Production

# script_id: day_044_checkpoints_persistence/postgres_conversation_store
import psycopg2
from psycopg2.extras import RealDictCursor
import json

class PostgresConversationStore:
    """Production-ready PostgreSQL storage."""

    def __init__(self, connection_string: str):
        self.conn = psycopg2.connect(connection_string)
        self._create_tables()

    def _create_tables(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS conversations (
                    id TEXT PRIMARY KEY,
                    user_id TEXT,
                    created_at TIMESTAMPTZ DEFAULT NOW(),
                    updated_at TIMESTAMPTZ DEFAULT NOW(),
                    metadata JSONB DEFAULT '{}'
                )
            """)
            cur.execute("""
                CREATE TABLE IF NOT EXISTS messages (
                    id SERIAL PRIMARY KEY,
                    conversation_id TEXT REFERENCES conversations(id),
                    role TEXT NOT NULL,
                    content TEXT NOT NULL,
                    tokens INTEGER,
                    timestamp TIMESTAMPTZ DEFAULT NOW()
                )
            """)
            cur.execute("""
                CREATE INDEX IF NOT EXISTS idx_messages_conversation
                ON messages(conversation_id, timestamp)
            """)
        self.conn.commit()

    def add_message(self, conversation_id: str, role: str, content: str, tokens: int = None):
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO messages (conversation_id, role, content, tokens)
                VALUES (%s, %s, %s, %s)
            """, (conversation_id, role, content, tokens))

            cur.execute("""
                UPDATE conversations SET updated_at = NOW()
                WHERE id = %s
            """, (conversation_id,))
        self.conn.commit()

    def get_recent_messages(self, conversation_id: str, limit: int = 20) -> list:
        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("""
                SELECT role, content, timestamp
                FROM messages
                WHERE conversation_id = %s
                ORDER BY timestamp DESC
                LIMIT %s
            """, (conversation_id, limit))
            return list(reversed(cur.fetchall()))

Entity Memory

Remember facts about entities:

# script_id: day_044_checkpoints_persistence/entity_memory
from openai import OpenAI
import json

client = OpenAI()

class EntityMemory:
    """Extract and remember facts about entities."""

    def __init__(self):
        self.entities = {}  # {entity_name: {fact_type: fact_value}}

    def extract_entities(self, text: str) -> dict:
        """Extract entities and facts from text."""
        prompt = f"""Extract entities and facts from this text.
Return JSON: {{"entity_name": {{"fact_type": "fact_value"}}}}

Text: {text}

Examples of entities: people, places, organizations
Examples of facts: name, age, location, role, relationship

JSON:"""

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )

        return json.loads(response.choices[0].message.content)

    def update(self, text: str):
        """Update entity memory from text."""
        new_entities = self.extract_entities(text)

        for entity, facts in new_entities.items():
            if entity not in self.entities:
                self.entities[entity] = {}
            self.entities[entity].update(facts)

    def get_context(self) -> str:
        """Get entity context for prompts."""
        if not self.entities:
            return ""

        context = "Known information:\n"
        for entity, facts in self.entities.items():
            fact_str = ", ".join([f"{k}: {v}" for k, v in facts.items()])
            context += f"- {entity}: {fact_str}\n"
        return context

# Usage
memory = EntityMemory()

memory.update("John Smith is a 35-year-old software engineer from Boston.")
memory.update("John's wife Sarah is a doctor at MGH.")

print(memory.get_context())
# Known information:
# - John Smith: age: 35, occupation: software engineer, location: Boston
# - Sarah: relationship: John's wife, occupation: doctor, workplace: MGH

Complete Persistent Agent

# script_id: day_044_checkpoints_persistence/persistent_agent
from openai import OpenAI
import sqlite3
import json
from datetime import datetime
import uuid

client = OpenAI()

class PersistentAgent:
    """Agent with full persistence capabilities."""

    def __init__(self, db_path: str = "agent.db"):
        self.conn = sqlite3.connect(db_path)
        self._setup_db()

    def _setup_db(self):
        self.conn.executescript("""
            CREATE TABLE IF NOT EXISTS sessions (
                id TEXT PRIMARY KEY,
                user_id TEXT,
                created_at TIMESTAMP,
                summary TEXT
            );

            CREATE TABLE IF NOT EXISTS messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id TEXT,
                role TEXT,
                content TEXT,
                timestamp TIMESTAMP
            );

            CREATE TABLE IF NOT EXISTS entities (
                session_id TEXT,
                entity TEXT,
                facts TEXT,
                PRIMARY KEY (session_id, entity)
            );
        """)
        self.conn.commit()

    def create_session(self, user_id: str) -> str:
        session_id = str(uuid.uuid4())
        self.conn.execute(
            "INSERT INTO sessions (id, user_id, created_at) VALUES (?, ?, ?)",
            (session_id, user_id, datetime.now())
        )
        self.conn.commit()
        return session_id

    def chat(self, session_id: str, user_input: str) -> str:
        # Save user message
        self._save_message(session_id, "user", user_input)

        # Get context
        messages = self._get_messages(session_id)
        entities = self._get_entities(session_id)
        summary = self._get_summary(session_id)

        # Build prompt
        system_content = "You are a helpful assistant."
        if summary:
            system_content += f"\n\nPrevious conversation summary:\n{summary}"
        if entities:
            system_content += f"\n\nKnown information:\n{entities}"

        full_messages = [
            {"role": "system", "content": system_content}
        ] + messages

        # Get response
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=full_messages
        )

        assistant_message = response.choices[0].message.content

        # Save assistant message
        self._save_message(session_id, "assistant", assistant_message)

        # Update entities periodically
        if len(messages) % 5 == 0:
            self._update_entities(session_id, user_input)

        return assistant_message

    def _save_message(self, session_id: str, role: str, content: str):
        self.conn.execute(
            "INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
            (session_id, role, content, datetime.now())
        )
        self.conn.commit()

    def _get_messages(self, session_id: str, limit: int = 10) -> list:
        cursor = self.conn.execute(
            "SELECT role, content FROM messages WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?",
            (session_id, limit)
        )
        return [{"role": r[0], "content": r[1]} for r in reversed(cursor.fetchall())]

    def _get_entities(self, session_id: str) -> str:
        cursor = self.conn.execute(
            "SELECT entity, facts FROM entities WHERE session_id = ?",
            (session_id,)
        )
        entities = cursor.fetchall()
        if not entities:
            return ""
        return "\n".join([f"- {e[0]}: {e[1]}" for e in entities])

    def _get_summary(self, session_id: str) -> str:
        cursor = self.conn.execute(
            "SELECT summary FROM sessions WHERE id = ?",
            (session_id,)
        )
        row = cursor.fetchone()
        return row[0] if row and row[0] else ""

    def _update_entities(self, session_id: str, text: str):
        # Simple entity extraction (use EntityMemory class for better results)
        pass

# Usage
agent = PersistentAgent()

# Create session
session = agent.create_session("user-123")

# Chat (persisted automatically)
print(agent.chat(session, "Hi, I'm Alice and I work at Google"))
print(agent.chat(session, "What's my name?"))  # Remembers!

# Later - resume session
print(agent.chat(session, "Where do I work?"))  # Still remembers!

Summary


What's Next?

You've mastered single agents! Next month, we'll explore Multi-Agent Systems - teams of agents working together!