Made a mistake? Go back in time! Time-travel debugging lets you rewind your agent to any previous state and try a different path.
Coming from Software Engineering? Time-travel debugging for agents is Redux DevTools for AI. If you've used browser DevTools to step through state changes, replay actions, or inspect snapshots, this is the same power applied to agent execution. Each checkpoint is a commit you can checkout — and rewinding then replaying with a different input is like
git checkoutto an old commit and committing down a new branch.
What is Time-Travel Debugging?
Time-travel debugging allows you to:
- View all checkpoints in a conversation
- Rewind to any previous state
- Replay from that point with different inputs
- Debug agent decisions step by step
Setting Up Checkpointing
First, enable checkpointing to record states:
A checkpointer is just storage that auto-saves a full snapshot of your state after every node runs — like an auto-commit after each step. MemorySaver keeps snapshots in RAM (fine for experiments); SqliteSaver/PostgresSaver persist them to disk for production.
# script_id: day_045_time_travel_debugging/checkpointing_setup
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver # or: pip install langgraph-checkpoint-sqlite
from typing import TypedDict, Annotated
from operator import add
# Define state
class AgentState(TypedDict):
messages: Annotated[list, add]
step: int
decision: str
# Create checkpointer
checkpointer = MemorySaver() # For production, use PostgresSaver or SqliteSaver
# Define nodes
def step_one(state: AgentState) -> dict:
return {"messages": ["Step 1 complete"], "step": 1}
def step_two(state: AgentState) -> dict:
return {"messages": ["Step 2 complete"], "step": 2}
def step_three(state: AgentState) -> dict:
return {"messages": ["Step 3 complete"], "step": 3}
# Build graph
workflow = StateGraph(AgentState)
workflow.add_node("step1", step_one)
workflow.add_node("step2", step_two)
workflow.add_node("step3", step_three)
workflow.set_entry_point("step1")
workflow.add_edge("step1", "step2")
workflow.add_edge("step2", "step3")
workflow.add_edge("step3", END)
# Compile WITH checkpointer
app = workflow.compile(checkpointer=checkpointer)
The Annotated[list, add] tells LangGraph how to combine updates: each node returns a partial dict, and for messages the add reducer appends to the list instead of overwriting it (other fields like step are overwritten). This is why each step accumulates messages.
Recording Checkpoints
Run your agent with a thread ID to record checkpoints:
Think of thread_id as a save-game slot or session key: every checkpoint for a run is filed under it, so re-using the same id lets you list and rewind that run's history, while a new id starts a clean timeline.
# script_id: day_045_time_travel_debugging/checkpointing_setup
# Run with thread ID
config = {"configurable": {"thread_id": "debug-session-1"}}
# This creates checkpoints at each step
result = app.invoke(
{"messages": ["Start"], "step": 0, "decision": ""},
config=config
)
print(f"Final state: {result}")
Viewing All Checkpoints
List all checkpoints for a thread:
# script_id: day_045_time_travel_debugging/checkpointing_setup
def list_checkpoints(checkpointer, thread_id: str):
"""List all checkpoints for a thread."""
config = {"configurable": {"thread_id": thread_id}}
checkpoints = list(app.get_state_history(config))
print(f"Found {len(checkpoints)} checkpoints:")
print("=" * 60)
for i, checkpoint in enumerate(checkpoints):
print(f"\nCheckpoint {i}:")
print(f" ID: {checkpoint.config['configurable']['checkpoint_id']}")
print(f" State: {checkpoint.values}")
return checkpoints
# Usage
checkpoints = list_checkpoints(checkpointer, "debug-session-1")
Output:
Found 5 checkpoints:
============================================================
Checkpoint 0:
ID: 1ef5a8b5-...
State: {'messages': ['Start', 'Step 1 complete', 'Step 2 complete', 'Step 3 complete'], 'step': 3, ...}
Checkpoint 1:
ID: 1ef5a8b4-...
State: {'messages': [..., 'Step 2 complete'], 'step': 2, ...}
Checkpoint 2:
ID: 1ef5a8b3-...
State: {'messages': ['Start', 'Step 1 complete'], 'step': 1, ...}
Checkpoint 3:
ID: 1ef5a8b2-...
State: {'messages': ['Start'], 'step': 0, 'decision': ''}
Checkpoint 4:
ID: 1ef5a8b1-...
State: {'messages': [], ...}
Note: get_state_history returns checkpoints newest-first, so index 0 is the most recent state and higher indices go further back in time (the last entry is the empty pre-input snapshot).
Rewinding to a Previous State
Go back to any checkpoint:
# script_id: day_045_time_travel_debugging/checkpointing_setup
def rewind_to_checkpoint(app, checkpointer, thread_id: str, checkpoint_index: int):
"""Rewind to a specific checkpoint."""
# Get all checkpoints
config = {"configurable": {"thread_id": thread_id}}
checkpoints = list(app.get_state_history(config))
if checkpoint_index >= len(checkpoints):
raise ValueError(f"Checkpoint {checkpoint_index} not found")
# Get the checkpoint
target_checkpoint = checkpoints[checkpoint_index]
checkpoint_id = target_checkpoint.config["configurable"]["checkpoint_id"]
print(f"Rewinding to checkpoint {checkpoint_index}")
print(f"State at that point: {target_checkpoint.values}")
# Create config to resume from this checkpoint
resume_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_id
}
}
return resume_config, target_checkpoint.values
# Rewind to checkpoint 1 (after step 1)
resume_config, old_state = rewind_to_checkpoint(
app, checkpointer, "debug-session-1", checkpoint_index=1
)
Replaying from a Checkpoint
Resume execution from a previous state:
Two ways to replay from a checkpoint: pass None as the input to resume exactly where it left off (like hitting Continue in a debugger), or pass a state dict to override values before continuing (like editing a variable in the debugger watch window, then stepping). The None is intentional, not a placeholder.
# script_id: day_045_time_travel_debugging/checkpointing_setup
def replay_from_checkpoint(app, resume_config, new_input: dict = None):
"""Replay from a checkpoint, optionally with new input."""
if new_input:
# Resume with modified state
result = app.invoke(new_input, config=resume_config)
else:
# Just continue from checkpoint
result = app.invoke(None, config=resume_config)
return result
# Replay from checkpoint 1 with different input
new_result = replay_from_checkpoint(
app,
resume_config,
new_input={"messages": ["Trying different approach"], "step": 1, "decision": "option_b"}
)
print(f"New result: {new_result}")
Editing State to Fork a Timeline
# script_id: day_045_time_travel_debugging/checkpointing_setup
# Edit the stored checkpoint in place, then continue from it
app.update_state(resume_config, {"choice": "B"})
result = app.invoke(None, config=resume_config)
Unlike replaying a fresh input (which overwrites), update_state edits the stored checkpoint in place to create a fork — like amending a commit, then continuing down that branch.
Complete Time-Travel Example
# script_id: day_045_time_travel_debugging/complete_time_travel_example
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver # or: pip install langgraph-checkpoint-sqlite
from typing import TypedDict, Annotated, Literal
from operator import add
class DebugState(TypedDict):
messages: Annotated[list, add]
choice: str
outcome: str
def make_choice(state: DebugState) -> dict:
"""Make a choice that affects the outcome."""
choice = state.get("choice", "A")
return {"messages": [f"Chose option {choice}"], "choice": choice}
def process_choice(state: DebugState) -> dict:
"""Process the choice."""
choice = state["choice"]
if choice == "A":
outcome = "Result A - maybe not what we wanted"
elif choice == "B":
outcome = "Result B - better!"
else:
outcome = "Unknown choice"
return {"messages": [f"Outcome: {outcome}"], "outcome": outcome}
# Build graph
workflow = StateGraph(DebugState)
workflow.add_node("choose", make_choice)
workflow.add_node("process", process_choice)
workflow.set_entry_point("choose")
workflow.add_edge("choose", "process")
workflow.add_edge("process", END)
# Compile with checkpointing
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
# First run - choose A
config = {"configurable": {"thread_id": "experiment-1"}}
result1 = app.invoke(
{"messages": ["Starting"], "choice": "A", "outcome": ""},
config=config
)
print(f"First run: {result1['outcome']}")
# Output: "Result A - maybe not what we wanted"
# Oops! Let's go back and try B instead
# Get checkpoints
checkpoints = list(app.get_state_history(config))
print(f"We have {len(checkpoints)} checkpoints")
# Rewind to before the choice was processed (checkpoint 1)
resume_config = {
"configurable": {
"thread_id": "experiment-1",
"checkpoint_id": checkpoints[1].config["configurable"]["checkpoint_id"]
}
}
# Replay with choice B
result2 = app.invoke(
{"messages": ["Retrying"], "choice": "B", "outcome": ""},
config=resume_config
)
print(f"After time travel: {result2['outcome']}")
# Output: "Result B - better!"
Debugging Workflow
Interactive Debugger
Build an interactive debugging session:
# script_id: day_045_time_travel_debugging/checkpointing_setup
class TimeTravelDebugger:
"""Interactive time-travel debugger."""
def __init__(self, app, checkpointer, thread_id: str):
self.app = app
self.checkpointer = checkpointer
self.thread_id = thread_id
self.checkpoints = []
self.current_index = 0
self.refresh_checkpoints()
def refresh_checkpoints(self):
"""Reload checkpoints from storage."""
config = {"configurable": {"thread_id": self.thread_id}}
self.checkpoints = list(self.app.get_state_history(config))
self.current_index = max(0, len(self.checkpoints) - 1)
def show_checkpoints(self):
"""Display all checkpoints."""
self.refresh_checkpoints()
print(f"\n{'='*60}")
print(f"Thread: {self.thread_id}")
print(f"Checkpoints: {len(self.checkpoints)}")
print(f"{'='*60}")
for i, cp in enumerate(self.checkpoints):
marker = " <-- current" if i == self.current_index else ""
print(f"\n[{i}]{marker}")
print(f" Messages: {cp.values.get('messages', [])[-2:]}") # Last 2
def goto(self, index: int):
"""Move to a specific checkpoint."""
if 0 <= index < len(self.checkpoints):
self.current_index = index
print(f"Moved to checkpoint {index}")
print(f"State: {self.checkpoints[index].values}")
else:
print(f"Invalid index. Range: 0-{len(self.checkpoints)-1}")
def replay(self, new_state: dict = None):
"""Replay from current checkpoint."""
cp = self.checkpoints[self.current_index]
config = {
"configurable": {
"thread_id": self.thread_id,
"checkpoint_id": cp.config["configurable"]["checkpoint_id"]
}
}
result = self.app.invoke(new_state, config=config)
self.refresh_checkpoints()
return result
def step_back(self):
"""Go back one checkpoint."""
if self.current_index > 0:
self.current_index -= 1
print(f"Stepped back to checkpoint {self.current_index}")
def step_forward(self):
"""Go forward one checkpoint."""
if self.current_index < len(self.checkpoints) - 1:
self.current_index += 1
print(f"Stepped forward to checkpoint {self.current_index}")
# Usage
debugger = TimeTravelDebugger(app, checkpointer, "debug-session-1")
# Show all checkpoints
debugger.show_checkpoints()
# Go back two steps
debugger.step_back()
debugger.step_back()
# Try different input
result = debugger.replay({"choice": "B"})
Best Practices
1. Use Meaningful Thread IDs
# script_id: day_045_time_travel_debugging/meaningful_thread_ids
# Good: Descriptive thread IDs
config = {"configurable": {"thread_id": f"user-{user_id}-task-{task_id}"}}
# Bad: Random or unclear
config = {"configurable": {"thread_id": "abc123"}}
2. Clean Up Old Checkpoints
# script_id: day_045_time_travel_debugging/checkpointing_setup
def count_old_checkpoints(thread_id: str, keep_last: int = 10):
"""Inspect how many checkpoints exceed the retention window.
NOTE: `MemorySaver` has no per-checkpoint `delete()` method
(`checkpointer.delete(...)` raises AttributeError). But in current LangGraph
(1.x) every checkpointer — including MemorySaver — exposes
`delete_thread(thread_id)` (and async `adelete_thread`) to drop a whole
thread's checkpoints; there is still no public API to prune individual
checkpoints by checkpoint_id. APIs vary by version, so verify against your saver.
"""
config = {"configurable": {"thread_id": thread_id}}
checkpoints = list(app.get_state_history(config))
excess = max(0, len(checkpoints) - keep_last)
print(f"{excess} checkpoints exceed the keep_last={keep_last} window")
# To drop old state you'd prune here, e.g.:
# checkpointer.delete_thread(thread_id) # drops the whole thread; works on MemorySaver too
return excess
3. Log Checkpoint Events
# script_id: day_045_time_travel_debugging/log_checkpoint
def log_checkpoint(state, checkpoint_id):
"""Log checkpoint for debugging."""
import logging
logging.info(f"Checkpoint {checkpoint_id}: step={state.get('step')}, status={state.get('status')}")
Checkpoint
Run the Complete Time-Travel Example: invoke once with choice="A" (you'll get "Result A"), then rewind to checkpoints[1] and re-invoke with choice="B". The second result should be "Result B - better!" — proof you replayed from a past checkpoint down a different branch. If the second run still says "Result A", your resume_config isn't carrying the checkpoint_id (check the checkpoints[1].config["configurable"]["checkpoint_id"] path), so it ran fresh instead of resuming.
Summary
Quick Reference
# script_id: day_045_time_travel_debugging/quick_reference
# Setup checkpointing
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
# Run with thread ID
config = {"configurable": {"thread_id": "my-thread"}}
result = app.invoke(state, config=config)
# List checkpoints
checkpoints = list(app.get_state_history(config))
# Rewind and replay
resume_config = {
"configurable": {
"thread_id": "my-thread",
"checkpoint_id": checkpoints[2].config["configurable"]["checkpoint_id"]
}
}
new_result = app.invoke(new_state, config=resume_config)
Exercises
- Run a multi-step graph under a fixed
thread_id, then list its checkpoints withget_state_historyand print thecheckpoint_idand key state fields for each. - Rewind to an earlier checkpoint and re-run from there, confirming the result differs from the original run when you change an input.
- Use
update_stateto edit a value at a past checkpoint (an "alternate timeline"), then continue execution and observe the branch. - Build a tiny "A/B" experiment: replay from the same checkpoint twice with two different inputs and compare the two outcomes.
Solutions (approaches)
history = list(app.get_state_history(config)); for cp in history: print(cp.config["configurable"]["checkpoint_id"], cp.values).- Build a
resume_configthat includes the targetcheckpoint_id, thenapp.invoke(new_state, config=resume_config). app.update_state(target_config, {"field": new_value})writes a fork point; the nextinvoke(None, config)continues from it.- Capture
cp = history[k].config, then callinvoketwice with inputs X and Y both resuming fromcp; diff the returned states.
What's Next?
Now let's look at agent memory and context budgeting — the three tiers of memory an agent keeps, and how to spend a fixed token window deliberately. (Durable database persistence itself is covered on Day 44.)