LangGraph Chapter 9 — Error Handling, Retries & Fallback Agents
Senior Architect Interview Series — LangGraph & Agentic AI
Navigation
← Chapter 8 — Human-in-the-Loop | Chapter 10 — Production Agents →
9.0 What This Chapter Covers
Production agents fail. APIs time out, databases go down, LLMs return garbage, tools throw exceptions. This chapter covers how to build resilient agents that degrade gracefully:
- Error categories in LLM agent systems
- Tool-level error handling (return errors as ToolMessages)
- Node-level error handling (try/except in nodes)
- Retry strategies within the ReAct loop
- Fallback agent patterns
- Rate limit and timeout handling
- Dead-letter and escalation patterns
- Error observability
9.1 Error Categories in Agent Systems
┌──────────────────────────────────────────────────────────┐
│ ERROR TAXONOMY │
│ │
│ L1 — Tool Errors │
│ ChromaDB unavailable, SQL syntax error │
│ → Handle in call_tools: return ToolMessage(error) │
│ │
│ L2 — LLM Errors │
│ Rate limit (429), API timeout, context too long │
│ → Handle in call_llm: retry with backoff │
│ │
│ L3 — State Errors │
│ Missing field, wrong type, reducer conflict │
│ → Handle at graph level: validate before invoke │
│ │
│ L4 — Logic Errors │
│ Infinite loop, wrong routing, hallucinated tool call│
│ → Handle with iteration limits, guardrails │
│ │
│ L5 — Infrastructure Errors │
│ DB down, OOM killed, network partition │
│ → Handle with checkpointer recovery, circuit breaker│
└──────────────────────────────────────────────────────────┘
9.2 L1 — Tool Error Handling
The golden rule: never raise from a tool execution loop. Return errors as ToolMessage objects so the LLM can recover.
Current Implementation in Your Project
# agent/agent.py — basic version without error handling
def call_tools(state: AgentState) -> AgentState:
last_message = state["messages"][-1]
results = []
for tool_call in last_message.tool_calls:
tool_fn = tool_map[tool_call["name"]]
output = tool_fn.invoke(tool_call["args"])
results.append(ToolMessage(content=str(output), tool_call_id=tool_call["id"]))
return {"messages": results}
Hardened Version
def call_tools(state: AgentState) -> AgentState:
last_message = state["messages"][-1]
results = []
for tool_call in last_message.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
tool_call_id = tool_call["id"]
try:
# Check if tool exists
if tool_name not in tool_map:
raise ValueError(f"Unknown tool: {tool_name}. Available: {list(tool_map.keys())}")
tool_fn = tool_map[tool_name]
output = tool_fn.invoke(tool_args)
results.append(ToolMessage(
content=str(output),
tool_call_id=tool_call_id
))
except Exception as e:
# Error → ToolMessage with error description
# LLM sees this and can retry with different args or acknowledge failure
error_content = f"Tool '{tool_name}' failed: {type(e).__name__}: {str(e)}"
logger.error("tool_execution_error", extra={
"tool_name": tool_name,
"error": str(e),
"args": tool_args
})
results.append(ToolMessage(
content=error_content,
tool_call_id=tool_call_id
))
return {"messages": results}
Why this works: The LLM receives "Tool 'rag_search' failed: ConnectionError: ChromaDB is unavailable" as context. It can then:
- Retry with different parameters
- Answer from its parametric knowledge (training data)
- Tell the user it couldn't complete the search
9.3 L2 — LLM Error Handling
OpenAI API errors you'll encounter in production:
| Error Code | Meaning | Handling Strategy |
|---|---|---|
| 429 | Rate limit exceeded | Exponential backoff + retry |
| 500/503 | OpenAI server error | Retry with backoff |
| 408 | Request timeout | Retry once, then fail gracefully |
| 400 | Bad request (context too long) | Trim messages and retry |
| 401 | Invalid API key | Alert operator, fail immediately |
Retry with Exponential Backoff
import time
import random
from openai import RateLimitError, APITimeoutError, APIError
def call_llm_with_retry(state: AgentState, max_retries: int = 3) -> AgentState:
"""call_llm node with OpenAI error handling and exponential backoff."""
for attempt in range(max_retries):
try:
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
except RateLimitError as e:
if attempt == max_retries - 1:
raise # re-raise on final attempt
wait = (2 ** attempt) + random.uniform(0, 1) # exponential + jitter
logger.warning(f"Rate limit hit, waiting {wait:.1f}s (attempt {attempt+1})")
time.sleep(wait)
except APITimeoutError as e:
if attempt == max_retries - 1:
# Final attempt failed — return graceful error message
return {"messages": [AIMessage(
content="I'm having trouble connecting to the AI service. Please try again."
)]}
time.sleep(2 ** attempt)
except Exception as e:
# Unexpected error — log and fail fast
logger.error(f"Unexpected LLM error: {e}")
raise
Context Too Long (400 Error)
from openai import BadRequestError
def call_llm_with_trimming(state: AgentState) -> AgentState:
"""call_llm with automatic context trimming on context length errors."""
messages = state["messages"]
while messages:
try:
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
except BadRequestError as e:
if "context_length_exceeded" in str(e).lower():
# Remove oldest 2 messages (one exchange) and retry
if len(messages) > 2:
messages = messages[2:] # drop oldest pair
logger.warning("Context too long, trimmed 2 messages")
else:
# Only 1-2 messages and still too long — truncate content
return {"messages": [AIMessage(
content="Your message is too long for me to process. Please shorten it."
)]}
else:
raise
9.4 Iteration Limit — Preventing Infinite Loops
Add a safety counter to the state to prevent runaway agent loops:
import operator
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
iteration_count: Annotated[int, operator.add] # accumulates on each call_llm
def call_llm(state: AgentState) -> AgentState:
response = llm_with_tools.invoke(state["messages"])
return {
"messages": [response],
"iteration_count": 1 # operator.add: increments by 1 each call
}
def should_call_tools(state: AgentState) -> str:
last_message = state["messages"][-1]
# Safety: hard limit of 10 LLM calls per invocation
if state.get("iteration_count", 0) >= 10:
logger.warning("Agent hit iteration limit — forcing END")
return END
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "call_tools"
return END
9.5 Fallback Agent Patterns
When the primary agent or tool fails, a fallback provides a degraded-but-functional response.
Pattern 1 — Fallback Tool Response
If rag_search fails (ChromaDB unavailable), fall back to LLM parametric knowledge:
@tool
def rag_search(query: str) -> str:
"""Search the knowledge base for Agent Factory information."""
try:
results = retrieve(query)
if not results:
return f"No results found for '{query}'. Using general knowledge."
return build_prompt(query, results)
except Exception as e:
# Graceful fallback — tell LLM to use its knowledge
return f"Knowledge base unavailable ({type(e).__name__}). Please answer from general knowledge."
The LLM receives the fallback message and can still provide an answer from its training data with a note that the knowledge base was unavailable.
Pattern 2 — Agent-Level Fallback Node
def primary_agent_node(state: SupervisorState) -> dict:
try:
result = rag_agent.invoke({"messages": state["messages"]})
return {"messages": [AIMessage(content=result["messages"][-1].content)]}
except Exception as e:
logger.error(f"RAG agent failed: {e}")
return {"messages": [AIMessage(content="__FALLBACK__")]} # signal fallback
def should_fallback(state: SupervisorState) -> str:
last = state["messages"][-1].content
if last == "__FALLBACK__":
return "fallback_agent"
return END
def fallback_agent_node(state: SupervisorState) -> dict:
"""Simple LLM response when all else fails."""
question = state["messages"][-2].content # user's question
response = llm.invoke([
SystemMessage("You're a helpful assistant. The knowledge base is unavailable."),
HumanMessage(content=question)
])
return {"messages": [response]}
Pattern 3 — Model Fallback
Primary: expensive model. Fallback: cheaper/faster model.
class ModelFallback:
def __init__(self):
self.primary = ChatOpenAI(model="gpt-4o", temperature=0)
self.fallback = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def invoke(self, messages):
try:
return self.primary.invoke(messages)
except Exception as e:
logger.warning(f"Primary model failed ({e}), using fallback model")
return self.fallback.invoke(messages)
llm = ModelFallback()
9.6 Structured Error State
For production agents, track errors in state for observability and recovery decisions:
class RobustAgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
errors: Annotated[list[str], operator.add] # accumulate all errors
retry_count: Annotated[int, operator.add]
last_error: str | None
def call_tools(state: RobustAgentState) -> dict:
last_message = state["messages"][-1]
results = []
new_errors = []
for tool_call in last_message.tool_calls:
try:
output = tool_map[tool_call["name"]].invoke(tool_call["args"])
results.append(ToolMessage(content=str(output), tool_call_id=tool_call["id"]))
except Exception as e:
error_msg = f"{tool_call['name']}: {str(e)}"
new_errors.append(error_msg)
results.append(ToolMessage(
content=f"Error: {str(e)}",
tool_call_id=tool_call["id"]
))
update = {"messages": results}
if new_errors:
update["errors"] = new_errors
update["last_error"] = new_errors[-1]
return update
def should_call_tools(state: RobustAgentState) -> str:
# Abort if too many errors accumulated
if len(state.get("errors", [])) >= 5:
return END
# ... normal routing
9.7 Rate Limiting and Throttling
In production, you'll need to protect against:
- Your own users sending too many requests
- OpenAI rate limits affecting all users
Request-Level Rate Limiting (FastAPI)
from fastapi import HTTPException
from datetime import datetime, timedelta
import asyncio
class RateLimiter:
def __init__(self, max_requests: int, window: timedelta):
self.max_requests = max_requests
self.window = window
self.requests: dict[str, list[datetime]] = {}
def check(self, user_id: str) -> bool:
now = datetime.utcnow()
window_start = now - self.window
# Clean old requests
self.requests.setdefault(user_id, [])
self.requests[user_id] = [
t for t in self.requests[user_id] if t > window_start
]
# Check limit
if len(self.requests[user_id]) >= self.max_requests:
return False
self.requests[user_id].append(now)
return True
rate_limiter = RateLimiter(max_requests=10, window=timedelta(minutes=1))
@app.post("/chat")
async def chat(question: str, session_id: str, user_id: str = "default"):
if not rate_limiter.check(user_id):
raise HTTPException(
status_code=429,
detail="Too many requests. Please wait before sending more."
)
# ... rest of handler
9.8 Timeout Handling
Long-running agent turns must have timeouts:
import asyncio
async def run_agent_with_timeout(
question: str,
session_id: str,
db: Session,
timeout_seconds: float = 30.0
) -> str:
"""Run the agent with a hard timeout."""
try:
result = await asyncio.wait_for(
agent.ainvoke(
{"messages": history},
config={"configurable": {"thread_id": session_id}}
),
timeout=timeout_seconds
)
return result["messages"][-1].content
except asyncio.TimeoutError:
logger.warning(f"Agent timeout for session {session_id}")
return "I'm taking too long to process this request. Please try again with a simpler question."
except Exception as e:
logger.error(f"Agent error: {e}", exc_info=True)
return "I encountered an error. Please try again."
9.9 Dead-Letter Queue Pattern
For requests that consistently fail, don't keep retrying — escalate to a dead-letter queue:
class DeadLetterQueue:
def __init__(self, db: Session):
self.db = db
def push(self, session_id: str, question: str, error: str, state: dict | None = None):
"""Store a failed request for later investigation."""
record = FailedRequest(
session_id=session_id,
question=question,
error=error,
state_snapshot=json.dumps(state) if state else None,
created_at=datetime.utcnow()
)
self.db.add(record)
self.db.commit()
# Alert the team (PagerDuty, Slack, etc.)
if alert_enabled:
send_alert(f"Dead-lettered request from {session_id}: {error[:100]}")
dlq = DeadLetterQueue(db)
async def run_agent_robust(question: str, session_id: str, db: Session) -> str:
max_retries = 3
last_error = None
for attempt in range(max_retries):
try:
return await run_agent_with_timeout(question, session_id, db)
except Exception as e:
last_error = e
await asyncio.sleep(2 ** attempt) # backoff between retries
# All retries exhausted — dead-letter it
dlq.push(session_id, question, str(last_error))
return "I'm unable to process your request at this time. Our team has been notified."
9.10 Interview Q&A
Q: How do you handle tool failures in a LangGraph agent?
The key principle is: return errors as
ToolMessageobjects rather than raising exceptions. Ifrag_searchfails because ChromaDB is down, thecall_toolsnode catches the exception, creates aToolMessagewithcontent="Tool failed: ChromaDB unavailable"and the correcttool_call_id, and returns it to state. On the nextcall_llminvocation, the LLM sees this error in its context and can either retry with different parameters, fall back to its training knowledge, or politely explain to the user that the service is unavailable. This keeps the agent loop running gracefully instead of crashing mid-turn.
Q: How do you prevent an agent from entering an infinite loop?
I add an
iteration_count: Annotated[int, operator.add]field toAgentState. Thecall_llmnode returns{"iteration_count": 1}as part of its update — theoperator.addreducer accumulates it across calls. Theshould_call_toolsrouting function checks this: ifstate["iteration_count"] >= MAX_ITERATIONS(e.g., 10), it returnsENDregardless of whether tool_calls are present. This creates a hard ceiling on the number of LLM calls per invocation. For production, I also log a warning when the limit is hit — it usually indicates a tool that's returning unhelpful results that cause the LLM to keep retrying.
Q: How do you handle OpenAI rate limits in a production agent?
Rate limits need to be handled at two levels. At the application level: implement per-user request rate limiting with a sliding window counter (10 requests per minute per user) using Redis for distributed deployment. At the OpenAI API level: wrap
llm.invoke()with exponential backoff — catchRateLimitError(429), wait(2^attempt + random_jitter)seconds, and retry up to 3 times. For sustained high traffic, OpenAI Batch API and request queuing with priority lanes separate interactive (low-latency) from background (high-throughput) workloads. LangSmith's ratelimit monitoring can proactively alert you to threshold breaches.
Q: What is a dead-letter queue in the context of an agent system?
A dead-letter queue (DLQ) is a persistent store for requests that exhausted all retry attempts and still failed. Instead of returning a generic error, the system saves the full context (question, session_id, error, state snapshot) to a
FailedRequesttable and alerts the engineering team. This ensures no request silently disappears and provides all the information needed to diagnose the root cause. For an agent system, the DLQ is especially valuable because agent failures are often non-deterministic — seeing the exact state at failure time is critical for debugging.
Q: How would you implement a fallback from GPT-4o to GPT-4o-mini on failure?
Create a
ModelFallbackclass that wraps both models.invoke()tries the primary model first; onAPIError,RateLimitError, or other failures, it logs the fallback event and calls the secondary model. The fallback is transparent to the rest of the agent — it still returns a validAIMessage. This can also be implemented as aRunnableFallbackusing LangChain's LCEL:chain = primary_llm.with_fallbacks([fallback_llm]). For production, include a circuit breaker so that if the primary model has been down for > 5 minutes, all traffic goes to the fallback without attempting the primary each time.
9.11 Key One-Liners to Memorize
"Never raise from call_tools — return errors as ToolMessages and let the LLM recover."
"Iteration limit: Annotated[int, operator.add] in state, checked in the routing function."
"Exponential backoff for rate limits: wait 2^attempt + jitter, retry up to 3 times."
"Fallback agents provide degraded-but-functional responses when primary fails."
"Dead-letter queue: when retries are exhausted, persist the failure and alert the team."
"Timeout every agent invocation — an unresponsive agent is worse than a fast error."
Next: Chapter 10 — Production Agents: Streaming, Tracing & Scaling