Skip to main content

Overview

Agent performance depends on multiple factors: model latency, tool execution time, network I/O, and execution patterns. This guide covers proven optimization techniques for production deployments.

Model Response Caching

Cache identical requests to reduce costs and latency:
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache
from langchain_openai import ChatOpenAI

# Enable global LLM caching
set_llm_cache(InMemoryCache())

model = ChatOpenAI(model="gpt-4")

# First call: hits API
response1 = model.invoke("What is 2+2?")

# Second identical call: served from cache (instant)
response2 = model.invoke("What is 2+2?")
Cache vs Prompt Caching: This is LangChain’s application-level cache, distinct from provider-specific prompt caching (Anthropic Claude, OpenAI GPT-4, etc.).

Custom Cache Implementations

Extend BaseCache for Redis, database, or file-based caching:
from langchain_core.caches import BaseCache
from langchain_core.outputs import Generation
import redis
import json

class RedisCache(BaseCache):
    """Redis-backed LLM cache for distributed systems."""
    
    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
    
    def _make_key(self, prompt: str, llm_string: str) -> str:
        """Generate cache key from prompt and model config."""
        return f"llm_cache:{hash(prompt + llm_string)}"
    
    def lookup(self, prompt: str, llm_string: str) -> list[Generation] | None:
        """Look up cached response."""
        key = self._make_key(prompt, llm_string)
        cached = self.redis.get(key)
        
        if cached:
            data = json.loads(cached)
            return [Generation(**gen) for gen in data]
        return None
    
    def update(self, prompt: str, llm_string: str, return_val: list[Generation]) -> None:
        """Store response in cache."""
        key = self._make_key(prompt, llm_string)
        data = [gen.dict() for gen in return_val]
        self.redis.setex(key, self.ttl, json.dumps(data))
    
    def clear(self, **kwargs) -> None:
        """Clear all cached responses."""
        for key in self.redis.scan_iter("llm_cache:*"):
            self.redis.delete(key)

# Enable Redis caching
set_llm_cache(RedisCache(redis_url="redis://localhost:6379", ttl=7200))

Cache Parameters

prompt
str
Serialized prompt string. For chat models, this includes message history.
llm_string
str
Serialized model configuration (model name, temperature, max tokens, etc.). Ensures different configs use separate cache entries.
return_val
list[Generation]
List of Generation objects to cache. Contains text, metadata, and token usage.

Async Execution

Use async patterns for concurrent tool execution and reduced latency:
from langchain.agents import create_agent
from langchain_core.tools import tool
import aiohttp

@tool
async def fetch_weather(city: str) -> str:
    """Get weather for a city."""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.weather.com/{city}") as resp:
            data = await resp.json()
            return data["description"]

@tool
async def fetch_news(topic: str) -> str:
    """Get latest news for a topic."""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.news.com/{topic}") as resp:
            data = await resp.json()
            return data["headline"]

agent = create_agent(
    model="openai:gpt-4",
    tools=[fetch_weather, fetch_news],  # Both async
)

# Async invocation enables concurrent tool execution
response = await agent.ainvoke({
    "messages": [HumanMessage("Get weather in SF and latest tech news")]
})
When the model calls multiple tools in one turn, async tools execute concurrently, dramatically reducing total execution time.

Batching Requests

Process multiple inputs efficiently with batching:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

# Sequential (slow)
results = [model.invoke(msg) for msg in messages]  # N separate API calls

# Batched (fast)
results = model.batch(messages)  # 1 batch API call

# Async batching (fastest for large batches)
results = await model.abatch(messages)

Agent Batching

Batch multiple agent invocations:
agent = create_agent(model="openai:gpt-4", tools=[search_tool])

inputs = [
    {"messages": [HumanMessage("Weather in NYC?")]},
    {"messages": [HumanMessage("Capital of France?")]},
    {"messages": [HumanMessage("2+2=")]},
]

# Process all inputs in parallel
results = await agent.abatch(inputs)

Streaming for Perceived Performance

Stream responses to improve perceived latency:
async for chunk in agent.astream(
    {"messages": [HumanMessage("Explain quantum computing")]}
):
    print(chunk, end="", flush=True)
Stream mode options:
async for mode, event in agent.astream(
    {"messages": [HumanMessage("Hello")]},
    stream_mode=["messages", "updates", "custom"],
):
    if mode == "messages":
        # Individual message chunks (fastest feedback)
        print(event.content, end="")
    elif mode == "updates":
        # State updates after each node
        print(f"Node update: {event}")
    elif mode == "custom":
        # Custom events from middleware
        print(f"Status: {event}")

Parallel Tool Execution

LangChain automatically executes multiple tool calls concurrently when using async:
# If LLM returns multiple tool calls in one response:
# [
#   {"name": "search", "args": {"query": "Python"}},
#   {"name": "weather", "args": {"city": "NYC"}},
#   {"name": "news", "args": {"topic": "AI"}},
# ]
# All three execute concurrently with async tools!

@tool
async def search(query: str) -> str:
    """Search the web."""
    # 2 second API call
    return results

@tool
async def weather(city: str) -> str:
    """Get weather."""
    # 1 second API call
    return conditions

@tool  
async def news(topic: str) -> str:
    """Get news."""
    # 3 second API call
    return articles

# Sequential: 2s + 1s + 3s = 6 seconds total
# Concurrent: max(2s, 1s, 3s) = 3 seconds total (2x faster!)

Model Configuration Tuning

Optimize model parameters for your use case:
from langchain_openai import ChatOpenAI

# Fast responses (lower quality)
fast_model = ChatOpenAI(
    model="gpt-4o-mini",  # Smaller, faster model
    temperature=0.3,  # Lower temperature = faster sampling
    max_tokens=500,  # Limit output length
)

# High quality (slower)
quality_model = ChatOpenAI(
    model="gpt-4o",
    temperature=0.7,
    max_tokens=2000,
)

# Use fast model for simple tasks, quality model for complex ones
from langchain.agents.middleware import wrap_model_call

@wrap_model_call
def adaptive_model_selection(request, handler):
    """Use fast model for simple queries."""
    user_message = request.messages[-1].content
    
    if len(user_message) < 50 and "?" in user_message:
        # Simple question: use fast model
        return handler(request.override(model=fast_model))
    else:
        # Complex task: use quality model
        return handler(request)

Middleware Caching

Implement custom caching with middleware:
from langchain.agents.middleware import wrap_model_call
from langchain.agents.middleware.types import ModelRequest, ModelResponse
import hashlib
import json

class ModelCacheMiddleware:
    """Cache model responses based on messages."""
    
    def __init__(self):
        self.cache = {}
    
    def _make_key(self, messages: list) -> str:
        """Generate cache key from messages."""
        content = json.dumps([m.content for m in messages])
        return hashlib.md5(content.encode()).hexdigest()
    
    def __call__(self, request: ModelRequest, handler) -> ModelResponse:
        key = self._make_key(request.messages)
        
        # Check cache
        if key in self.cache:
            print(f"Cache hit for key {key[:8]}...")
            return self.cache[key]
        
        # Execute and cache
        response = handler(request)
        self.cache[key] = response
        print(f"Cached response for key {key[:8]}...")
        return response

cache = ModelCacheMiddleware()

agent = create_agent(
    model="openai:gpt-4",
    middleware=[cache],
)

Connection Pooling

Reuse HTTP connections for better performance:
import httpx
from langchain_openai import ChatOpenAI

# Create persistent HTTP client
client = httpx.Client(
    limits=httpx.Limits(
        max_connections=100,
        max_keepalive_connections=20,
    ),
    timeout=httpx.Timeout(30.0),
)

# Models share the client
model = ChatOpenAI(
    model="gpt-4",
    http_client=client,  # Reuse connections
)

Monitoring Performance

Track performance metrics with middleware:
from langchain.agents.middleware import AgentMiddleware
import time
import statistics

class PerformanceMonitor(AgentMiddleware):
    """Monitor agent execution performance."""
    
    def __init__(self):
        super().__init__()
        self.model_times = []
        self.tool_times = []
    
    def wrap_model_call(self, request, handler):
        start = time.time()
        response = handler(request)
        duration = time.time() - start
        
        self.model_times.append(duration)
        return response
    
    def wrap_tool_call(self, request, handler):
        start = time.time()
        result = handler(request)
        duration = time.time() - start
        
        self.tool_times.append(duration)
        return result
    
    def after_agent(self, state, runtime):
        if self.model_times:
            print(f"\n=== Performance Report ===")
            print(f"Model calls: {len(self.model_times)}")
            print(f"  Avg: {statistics.mean(self.model_times):.2f}s")
            print(f"  P95: {statistics.quantiles(self.model_times, n=20)[18]:.2f}s")
        
        if self.tool_times:
            print(f"Tool calls: {len(self.tool_times)}")
            print(f"  Avg: {statistics.mean(self.tool_times):.2f}s")
            print(f"  P95: {statistics.quantiles(self.tool_times, n=20)[18]:.2f}s")

monitor = PerformanceMonitor()

agent = create_agent(
    model="openai:gpt-4",
    tools=[search_tool],
    middleware=[monitor],
)

Best Practices

Measure where time is actually spent:
import cProfile
import pstats

profiler = cProfile.Profile()
profiler.enable()

# Run agent
response = agent.invoke({"messages": [HumanMessage("Hello")]})

profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)  # Top 20 slowest functions
Evaluate if smaller, faster models meet your quality bar:
# Test against your evaluation set
models_to_test = [
    "gpt-4o-mini",      # Fastest
    "gpt-4o",           # Balanced
    "gpt-4-turbo",      # High quality
]

for model_name in models_to_test:
    model = ChatOpenAI(model=model_name)
    results = evaluate(model, test_cases)
    print(f"{model_name}: {results['accuracy']:.2%} accuracy, {results['avg_latency']:.2f}s")
Cache at multiple levels:
# 1. Model-level cache (built-in)
set_llm_cache(RedisCache(...))

# 2. Tool-level cache (custom)
@tool
@functools.lru_cache(maxsize=1000)
def expensive_api_call(query: str) -> str:
    return api.search(query)

# 3. Agent-level cache (middleware)
agent = create_agent(
    model=model,
    middleware=[CacheMiddleware()],
)
Prevent unbounded execution:
import asyncio

# Timeout for entire agent execution
try:
    response = await asyncio.wait_for(
        agent.ainvoke({"messages": [HumanMessage("Hello")]}),
        timeout=30.0,  # 30 second max
    )
except asyncio.TimeoutError:
    print("Agent execution timed out")

# Timeout for individual tools
@tool
async def search_with_timeout(query: str) -> str:
    """Search with 10 second timeout."""
    try:
        return await asyncio.wait_for(
            api.search(query),
            timeout=10.0,
        )
    except asyncio.TimeoutError:
        return "Search timed out"

Common Performance Pitfalls

Avoid These Mistakes:
  1. Sync tools with async agent: Implement async versions of I/O-bound tools
  2. No caching: Repeated identical calls waste money and time
  3. Sequential execution: Use abatch() instead of loops
  4. Large context windows: Trim conversation history to essential messages
  5. No timeouts: Implement timeouts at tool and agent levels

Performance Checklist

  • Enable LLM caching for repeated queries
  • Implement async tools for I/O operations
  • Use abatch() for multiple inputs
  • Stream responses for better UX
  • Set appropriate timeouts
  • Monitor latency with middleware
  • Profile to identify bottlenecks
  • Use smaller models where quality permits
  • Implement connection pooling
  • Cache tool results when appropriate

Next Steps

Middleware System

Build performance monitoring middleware

Rate Limiting

Balance performance with API quotas

Custom Tools

Optimize tool implementations