Introduction: Why LangGraph for Production?

Every enterprise I work with goes through the same arc. They build a promising LLM prototype in a weekend — a ReAct agent, a chain of prompts, something impressive in a Jupyter notebook. Then they try to ship it. And that's where the trouble starts.

The prototype breaks when the LLM hallucinates a tool call. It has no memory between invocations. It can't recover from a transient API timeout. It has no visibility into what it decided or why. And it definitely can't be restarted mid-workflow.

In our Oracle Agentic AI training workshops (rated 4.91 / 5.0 by 200+ engineers), this gap — from impressive demo to reliable production system — is the single most common blocker we address. The answer, nine times out of ten, is LangGraph.

LangGraph is not just another LLM framework. It is a graph-based execution engine designed specifically for the problems that emerge when agents go to production: stateful workflows, deterministic branching, multi-agent coordination, and fault-tolerant recovery. In this post I'll share the patterns we teach and the lessons we've collected from real-world enterprise deployments.

📌 Prerequisites

Familiarity with Python and basic LangChain concepts (LLMs, tools, prompts). LangGraph knowledge is not required — we build from the ground up.

What is LangGraph?

LangGraph is an open-source library (part of the LangChain ecosystem) that models an AI agent as a directed graph where:

  • Nodes are Python functions that receive state and return updated state.
  • Edges define transitions between nodes — either unconditional (always go here) or conditional (go here if this condition is true).
  • State is a typed Python dataclass (or TypedDict) that flows through every node and persists across graph invocations.

This is fundamentally different from a simple LangChain LCEL pipeline. LCEL chains are acyclic — data flows in one direction, top to bottom, and there is no built-in mechanism for loops, retries, or human-in-the-loop pauses. LangGraph is explicitly designed for cyclic workflows where an agent can loop, branch, wait for external input, and resume — all with full state persistence.

┌─────────────────────────────────────────────────┐ │ LangGraph Agent Graph │ │ │ │ [START] → [Planner Node] → [Tool Router] │ │ │ │ │ ┌────────┴────────┐ │ │ │ │ │ │ [Search Tool] [Code Exec] │ │ │ │ │ │ └────────┬────────┘ │ │ │ │ │ [Critic Node] │ │ / \ │ │ [good] [retry] │ │ │ │ │ │ [END] [Planner Node]│ └─────────────────────────────────────────────────┘

This graph topology gives you something priceless in production: predictable control flow. No matter what the LLM decides, execution follows edges you've defined. The LLM influences which edge is taken, but it cannot escape the graph.

State Machine Architecture

The first thing we teach in our workshops is to treat your agent state as a first-class citizen. In LangGraph, state is not a dictionary you pass around informally — it's a typed schema that every node reads from and writes to.

Here's how to design state for a production agent:

Python
from typing import TypedDict, Annotated, List, Optional
import operator

class AgentState(TypedDict):
    # Input fields
    user_query: str
    session_id: str

    # Accumulating fields — use operator.add for list merging
    messages: Annotated[List[dict], operator.add]
    tool_calls: Annotated[List[dict], operator.add]

    # Control fields
    retry_count: int
    last_error: Optional[str]
    final_answer: Optional[str]

    # Observability fields
    steps_taken: int
    confidence_score: float

A few design principles we enforce in enterprise deployments:

  • Accumulate, don't overwrite lists. Use Annotated[List, operator.add] for message history and tool call logs. Overwriting lists is a common bug that causes agents to lose context mid-workflow.
  • Separate control state from business state. Fields like retry_count and last_error are infrastructure — they drive graph routing. Fields like final_answer are business outputs. Keeping them separate makes the graph logic much cleaner.
  • Make state serialisable from day one. Every field should be JSON-serialisable. This is what enables LangGraph checkpointing — the mechanism that lets your agent survive a pod restart on Kubernetes.

⚠️ Common Pitfall

Do not store LLM objects, database connections, or open file handles in agent state. These are not serialisable. State should contain only data — connections belong in node closures or dependency injection.

Multi-Agent Orchestration

Single agents are great for simple tasks. But production workflows — code review pipelines, document processing systems, autonomous DevOps remediation — demand more than one agent can deliver reliably. The solution is a supervisor pattern: a top-level orchestrator graph that spawns and coordinates specialised sub-agents.

In our Oracle training, we built a three-tier architecture that's become our reference implementation:

Supervisor Agent (orchestrates) │ ├── Planner Agent → decomposes tasks, creates step-by-step plan ├── Executor Agent → runs tools, calls APIs, writes code └── Critic Agent → evaluates output quality, decides pass/retry

Each sub-agent is itself a compiled LangGraph. The supervisor maintains a SubgraphState that tracks which sub-agents have completed and what they returned. This gives you isolation — a failure in the Executor Agent does not crash the Planner — and composability — you can swap the Executor for a different implementation without changing the Supervisor graph.

The critical implementation detail: sub-agents communicate via structured messages, not raw strings. Define a Pydantic model for the contract between agents:

Python
from pydantic import BaseModel
from typing import List, Literal

class PlannerOutput(BaseModel):
    steps: List[str]
    estimated_complexity: Literal["low", "medium", "high"]
    requires_human_review: bool

class ExecutorOutput(BaseModel):
    result: str
    tools_used: List[str]
    execution_time_ms: int
    success: bool
    error_message: str = ""

class CriticVerdict(BaseModel):
    verdict: Literal["pass", "retry", "escalate"]
    confidence: float  # 0.0 – 1.0
    feedback: str

Using structured outputs with llm.with_structured_output(PlannerOutput) eliminates an entire class of parsing bugs and makes inter-agent communication reliable enough for production.

✅ Real-World Result

In our Oracle DevOps automation pilot, switching from a single monolithic agent to this three-tier architecture reduced hallucination-caused failures by 73% and increased task completion rate from 61% to 89% on complex multi-step workflows.

Error Recovery & Retries

This is where most agent implementations fall apart in production. Error handling in a LangGraph agent is not about wrapping nodes in try/except (though that's part of it). It's about designing the graph topology so that failures are a first-class concept with explicit recovery paths.

The pattern we teach is the Error Recovery Triangle:

  1. Catch — Every node that can fail wraps its operation in a try/except and writes the error into state["last_error"] rather than raising.
  2. Route — A conditional edge after the node checks state["last_error"] and routes to either the success path or the error handler node.
  3. Recover or Escalate — The error handler increments retry_count. If retry_count < MAX_RETRIES, route back to the failed node with exponential back-off; otherwise route to a graceful degradation node.
Python
import time
from langgraph.graph import StateGraph, END

MAX_RETRIES = 3

def executor_node(state: AgentState) -> AgentState:
    try:
        result = call_external_api(state["user_query"])
        return {
            "final_answer": result,
            "last_error": None,
            "steps_taken": state["steps_taken"] + 1
        }
    except Exception as e:
        return {
            "last_error": str(e),
            "steps_taken": state["steps_taken"] + 1
        }

def error_handler_node(state: AgentState) -> AgentState:
    retry = state.get("retry_count", 0) + 1
    # Exponential back-off: 1s, 2s, 4s
    time.sleep(2 ** (retry - 1))
    return {"retry_count": retry}

def should_retry(state: AgentState) -> str:
    if state.get("last_error") is None:
        return "success"
    if state.get("retry_count", 0) >= MAX_RETRIES:
        return "give_up"
    return "retry"

builder = StateGraph(AgentState)
builder.add_node("executor", executor_node)
builder.add_node("error_handler", error_handler_node)

builder.add_conditional_edges("executor", should_retry, {
    "success":   END,
    "retry":    "error_handler",
    "give_up":  "graceful_degradation"
})
builder.add_edge("error_handler", "executor")  # loop back

This pattern — node catches, edge routes, handler recovers — is the backbone of every production agent we've deployed. Combined with LangSmith tracing, every retry is fully observable: you can see exactly which step failed, how many retries occurred, and what the final resolution was.

Production Deployment Patterns

Getting LangGraph to work in a notebook is one thing. Deploying it reliably on Kubernetes for enterprise workloads is another. Here are the four architectural decisions that matter most:

1. Checkpoint Store: SQLite → Redis

LangGraph's built-in MemorySaver is fine for development, but it's in-process and non-persistent. In production, use AsyncRedisSaver (from langgraph-checkpoint-redis). Every graph invocation checkpoints state to Redis after each node, enabling:

  • Horizontal scaling — any replica can pick up a paused workflow
  • Pod crash recovery — restart from the last checkpoint, not the beginning
  • Human-in-the-loop — pause the graph, wait for approval, resume days later

2. Containerise with Explicit Resource Limits

LangGraph agents are CPU-bound during graph compilation and I/O-bound during LLM calls. Set resource requests/limits accordingly:

YAML
resources:
  requests:
    cpu: "250m"
    memory: "512Mi"
  limits:
    cpu: "1000m"
    memory: "1Gi"

3. Health Checks via Graph Compilation

Compile your graph at startup (not on first request) and expose a /health endpoint that verifies the compiled graph object is non-null. This catches configuration errors at pod startup — before any traffic hits the agent.

4. Structured Logging and Tracing

Wire every node to emit a structured log line: {"node": "executor", "thread_id": "...", "step": 3, "duration_ms": 420}. Combined with LangSmith or OpenTelemetry, you get per-node latency histograms, tool call success rates, and retry frequency — the exact metrics you need to optimise agent performance in production.

🚀 Stack Reference

Recommended Production Stack: LangGraph 0.2+ · Redis 7 (checkpoint store) · FastAPI (REST wrapper) · Kubernetes 1.29+ · Prometheus + Grafana (metrics) · LangSmith (LLM tracing)

Real-World Example: A Three-Node Production Agent

Below is a condensed but fully runnable LangGraph agent that implements the patterns discussed above. It has three nodes — planner, executor, and critic — with conditional routing and Redis-backed checkpointing.

Python
"""
Production LangGraph Agent — 3-Node Pattern
Requires: pip install langgraph langchain-openai langgraph-checkpoint-redis
"""
from typing import TypedDict, Annotated, List, Optional
import operator
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.redis import AsyncRedisSaver
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
import asyncio

# ── State ─────────────────────────────────────────────────────
class AgentState(TypedDict):
    user_query:     str
    messages:       Annotated[List[dict], operator.add]
    plan:           Optional[str]
    execution_result: Optional[str]
    retry_count:    int
    verdict:        Optional[str]   # "pass" | "retry" | "escalate"

# ── Structured output schemas ─────────────────────────────────
class Plan(BaseModel):
    steps: List[str]
    approach: str

class Verdict(BaseModel):
    verdict:    str   # "pass" | "retry" | "escalate"
    confidence: float
    feedback:   str

# ── LLM ──────────────────────────────────────────────────────
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# ── Node 1: Planner ───────────────────────────────────────────
async def planner_node(state: AgentState) -> dict:
    planner_llm = llm.with_structured_output(Plan)
    result: Plan = await planner_llm.ainvoke([
        {"role": "system", "content": "You are an expert planner. Break the task into clear steps."},
        {"role": "user", "content": state["user_query"]}
    ])
    return {
        "plan": "\n".join(f"  {i+1}. {s}" for i, s in enumerate(result.steps)),
        "messages": [{"role": "planner", "content": result.approach}]
    }

# ── Node 2: Executor ──────────────────────────────────────────
async def executor_node(state: AgentState) -> dict:
    try:
        response = await llm.ainvoke([
            {"role": "system", "content": "You are an expert executor. Follow the plan precisely."},
            {"role": "user",   "content": f"Plan:\n{state['plan']}\n\nOriginal query: {state['user_query']}"}
        ])
        return {
            "execution_result": response.content,
            "messages": [{"role": "executor", "content": response.content}]
        }
    except Exception as e:
        return {
            "execution_result": None,
            "messages": [{"role": "executor_error", "content": str(e)}],
            "verdict": "retry"
        }

# ── Node 3: Critic ────────────────────────────────────────────
async def critic_node(state: AgentState) -> dict:
    critic_llm = llm.with_structured_output(Verdict)
    result: Verdict = await critic_llm.ainvoke([
        {"role": "system", "content": "Review the execution result. Return pass, retry, or escalate."},
        {"role": "user", "content": (
            f"Query: {state['user_query']}\n"
            f"Plan: {state['plan']}\n"
            f"Result: {state['execution_result']}"
        )}
    ])
    return {
        "verdict":   result.verdict,
        "messages": [{"role": "critic", "content": result.feedback}]
    }

# ── Routing ────────────────────────────────────────────────────
def route_after_critic(state: AgentState) -> str:
    if state.get("verdict") == "pass":
        return "done"
    if state.get("retry_count", 0) >= 2:
        return "done"   # give up after 2 retries
    return "retry"

# ── Graph construction ─────────────────────────────────────────
builder = StateGraph(AgentState)
builder.add_node("planner",  planner_node)
builder.add_node("executor", executor_node)
builder.add_node("critic",   critic_node)

builder.set_entry_point("planner")
builder.add_edge("planner", "executor")
builder.add_edge("executor", "critic")
builder.add_conditional_edges("critic", route_after_critic, {
    "done":  END,
    "retry": "executor"   # loop back to executor, skip re-planning
})

# ── Compile with Redis checkpointing ─────────────────────────
async def main():
    async with AsyncRedisSaver.from_conn_string("redis://localhost:6379") as memory:
        graph = builder.compile(checkpointer=memory)
        config = {"configurable": {"thread_id": "prod-run-001"}}
        result = await graph.ainvoke(
            {
                "user_query": "Analyse our Kubernetes deployment logs and suggest optimisations",
                "retry_count": 0,
                "messages": []
            },
            config=config
        )
        print("Final result:", result["execution_result"])
        print("Critic verdict:", result["verdict"])
        print("Message history:", len(result["messages"]), "entries")

asyncio.run(main())

What makes this production-ready:

  • Typed state — every field is declared upfront; no silent key errors.
  • Structured outputs — both the Planner and Critic return Pydantic models, not raw strings.
  • Retry loop — the Critic can route back to the Executor up to 2 times with full state persistence between retries.
  • Redis checkpointing — every node transition is checkpointed; the workflow can resume after any failure.
  • Thread-based isolation — the thread_id config key means concurrent users get completely isolated state.

Frequently Asked Questions

What is LangGraph and how is it different from LangChain?

LangGraph is a stateful, graph-based orchestration framework built on top of LangChain. While LangChain provides the building blocks (LLMs, tools, prompts), LangGraph adds explicit state management and a directed graph execution model — making it possible to build agents with looping, branching, and persistent memory that are essential for production use cases.

How do you handle errors and retries in a LangGraph agent?

LangGraph supports error recovery through conditional edges and retry loops within the graph. You can add a dedicated error-handler node that inspects the state, logs the failure, and either retries the failed node (with exponential back-off) up to a maximum count or routes to a graceful fallback path. Combining this with LangSmith tracing gives you full observability over every failure and retry.

Can LangGraph agents be deployed on Kubernetes?

Yes. LangGraph agents are Python applications that can be containerised with Docker and deployed on Kubernetes. Use a Deployment with at least 2 replicas, attach a Redis-backed checkpoint store for shared state across pods, expose via a ClusterIP Service, and protect with a HorizontalPodAutoscaler. This architecture was validated during our Oracle enterprise training workshops.

Conclusion & Next Steps

LangGraph is the framework that closes the gap between "impressive AI demo" and "reliable production system." The patterns in this post — typed state design, the planner-executor-critic triad, error recovery triangles, and Redis-backed Kubernetes deployment — are not theoretical. They come directly from what we teach and validate in real enterprise environments.

The engineers who master these patterns are not just better AI developers — they become the people their organisations rely on to take AI from the boardroom pilot to the production floor. And that's exactly the kind of engineer the market is demanding right now.

If you want to build these skills in a structured, hands-on environment — with real Kubernetes clusters, real LLM APIs, and real feedback from peers and instructors — our training programme is the fastest path there.

Ready to Build Production AI Agents?

Join the same hands-on programme trusted by Oracle's engineering teams — rated 4.91 / 5.0 by 200+ enterprise engineers.

5 days · Live labs · LangGraph · Multi-agent systems · Kubernetes deployment

Explore the Training Programme →