Skip to content

Production Patterns - Scaling LangChain Systems ​

Learn enterprise-grade patterns for deploying, scaling, and maintaining LangChain applications in production environments

πŸš€ Production Readiness Overview ​

Building production LangChain applications requires careful consideration of scalability, reliability, monitoring, and maintenance. This guide covers battle-tested patterns for enterprise deployment.

πŸ—οΈ Production System Architecture ​

text
                    🏭 PRODUCTION LANGCHAIN ARCHITECTURE 🏭
                         (Enterprise-grade deployment)

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                      CLIENT LAYER                              β”‚
    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
    β”‚  β”‚   Web App   β”‚  β”‚ Mobile App  β”‚  β”‚      API Clients       β”‚ β”‚
    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                   API GATEWAY                                  β”‚
    β”‚  β€’ Authentication  β€’ Rate Limiting  β€’ Load Balancing          β”‚
    β”‚  β€’ Request Routing β€’ SSL Termination β€’ CORS Handling          β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                 APPLICATION LAYER                              β”‚
    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
    β”‚  β”‚  FastAPI/Flask  β”‚  β”‚   Celery Queue  β”‚  β”‚  WebSocket Hub  β”‚ β”‚
    β”‚  β”‚    Services     β”‚  β”‚   Background    β”‚  β”‚   Real-time     β”‚ β”‚
    β”‚  β”‚                 β”‚  β”‚     Tasks       β”‚  β”‚   Processing    β”‚ β”‚
    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                 LANGCHAIN LAYER                                β”‚
    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
    β”‚  β”‚   Agents    β”‚  β”‚   Chains    β”‚  β”‚   Memory    β”‚  β”‚  Tools  β”‚ β”‚
    β”‚  β”‚  Pool       β”‚  β”‚  Factory    β”‚  β”‚   Manager   β”‚  β”‚ Registryβ”‚ β”‚
    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                 INFRASTRUCTURE LAYER                           β”‚
    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
    β”‚  β”‚  Vector DB  β”‚  β”‚  LLM APIs   β”‚  β”‚  Redis      β”‚  β”‚  Logs & β”‚ β”‚
    β”‚  β”‚  (Pinecone/ β”‚  β”‚  (OpenAI/   β”‚  β”‚  Cache      β”‚  β”‚ Metrics β”‚ β”‚
    β”‚  β”‚  Weaviate)  β”‚  β”‚  Anthropic) β”‚  β”‚             β”‚  β”‚  (ELK)  β”‚ β”‚
    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸš€ FastAPI Production Server ​

🌐 Enterprise API Server ​

python
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
import asyncio
import logging
import time
import json
import redis
import os
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Production configurations
class ProductionConfig:
    """Production configuration settings"""
    
    # API Settings
    API_VERSION = "v1"
    MAX_REQUEST_SIZE = 10 * 1024 * 1024  # 10MB
    REQUEST_TIMEOUT = 300  # 5 minutes
    
    # Rate Limiting
    RATE_LIMIT_REQUESTS = 100
    RATE_LIMIT_WINDOW = 3600  # 1 hour
    
    # Security
    ALLOWED_HOSTS = ["yourdomain.com", "api.yourdomain.com"]
    CORS_ORIGINS = ["https://yourdomain.com", "https://app.yourdomain.com"]
    
    # Redis Configuration
    REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
    
    # Monitoring
    ENABLE_METRICS = True
    LOG_LEVEL = "INFO"

# Request/Response Models
class QueryRequest(BaseModel):
    query: str = Field(..., min_length=1, max_length=1000)
    session_id: Optional[str] = None
    context: Optional[Dict[str, Any]] = None
    options: Optional[Dict[str, Any]] = Field(default_factory=dict)

class QueryResponse(BaseModel):
    response: str
    session_id: str
    metadata: Dict[str, Any]
    processing_time: float
    timestamp: datetime

class HealthResponse(BaseModel):
    status: str
    version: str
    uptime: float
    components: Dict[str, str]

# Production LangChain Manager
class ProductionLangChainManager:
    """Production-ready LangChain manager with pooling and caching"""
    
    def __init__(self):
        self.redis_client = redis.from_url(ProductionConfig.REDIS_URL)
        self.llm_pool = self._create_llm_pool()
        self.vector_store_pool = self._create_vector_store_pool()
        self.agent_factory = AgentFactory()
        self.memory_manager = MemoryManager(self.redis_client)
        
    def _create_llm_pool(self):
        """Create LLM connection pool"""
        from langchain_openai import ChatOpenAI
        
        # Create multiple LLM instances for load balancing
        pool = []
        for i in range(5):  # Pool of 5 LLM instances
            llm = ChatOpenAI(
                temperature=0.1,
                model="gpt-3.5-turbo",
                max_retries=2,
                request_timeout=60
            )
            pool.append(llm)
        
        return pool
    
    def _create_vector_store_pool(self):
        """Create vector store connection pool"""
        # Implementation depends on your vector store
        # Example with Pinecone or Weaviate
        return {
            "default": None,  # Initialize your vector store here
            "backup": None    # Backup vector store
        }
    
    async def get_llm(self):
        """Get LLM instance from pool"""
        # Simple round-robin selection
        if not hasattr(self, '_llm_index'):
            self._llm_index = 0
        
        llm = self.llm_pool[self._llm_index % len(self.llm_pool)]
        self._llm_index += 1
        return llm
    
    async def process_query(self, request: QueryRequest) -> QueryResponse:
        """Process query with production patterns"""
        start_time = time.time()
        
        try:
            # Get or create session
            session_id = request.session_id or f"session_{int(time.time())}"
            
            # Load conversation memory
            memory = await self.memory_manager.get_memory(session_id)
            
            # Get LLM from pool
            llm = await self.get_llm()
            
            # Create appropriate agent/chain
            agent = await self.agent_factory.create_agent(
                llm=llm,
                memory=memory,
                tools=request.options.get("tools", [])
            )
            
            # Process query
            response = await agent.arun(request.query)
            
            # Save memory
            await self.memory_manager.save_memory(session_id, memory)
            
            # Cache result
            await self._cache_result(request.query, response, session_id)
            
            processing_time = time.time() - start_time
            
            return QueryResponse(
                response=response,
                session_id=session_id,
                metadata={
                    "tokens_used": getattr(llm, 'tokens_used', 0),
                    "model": "gpt-3.5-turbo",
                    "tools_used": request.options.get("tools", [])
                },
                processing_time=processing_time,
                timestamp=datetime.now()
            )
            
        except Exception as e:
            logger.error(f"Query processing error: {str(e)}")
            raise HTTPException(status_code=500, detail=f"Processing error: {str(e)}")
    
    async def _cache_result(self, query: str, response: str, session_id: str):
        """Cache query result"""
        cache_key = f"query_cache:{hash(query + session_id)}"
        cache_data = {
            "query": query,
            "response": response,
            "timestamp": datetime.now().isoformat()
        }
        
        # Cache for 1 hour
        await asyncio.to_thread(
            self.redis_client.setex,
            cache_key,
            3600,
            json.dumps(cache_data)
        )

class AgentFactory:
    """Factory for creating different types of agents"""
    
    async def create_agent(self, llm, memory, tools: List[str]):
        """Create agent based on requirements"""
        from langchain.agents import AgentType, initialize_agent
        from langchain.tools import DuckDuckGoSearchRun
        
        # Initialize tools based on request
        agent_tools = []
        if "search" in tools:
            agent_tools.append(DuckDuckGoSearchRun())
        
        # Create agent
        if agent_tools:
            agent = initialize_agent(
                agent_tools,
                llm,
                agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
                memory=memory,
                verbose=False,
                handle_parsing_errors=True
            )
        else:
            # Simple conversational agent
            from langchain.chains import ConversationChain
            agent = ConversationChain(
                llm=llm,
                memory=memory,
                verbose=False
            )
        
        return agent

class MemoryManager:
    """Manages conversation memory with Redis persistence"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    async def get_memory(self, session_id: str):
        """Get conversation memory for session"""
        from langchain.memory import ConversationSummaryBufferMemory
        
        memory_key = f"memory:{session_id}"
        
        try:
            # Try to load existing memory
            memory_data = await asyncio.to_thread(self.redis.get, memory_key)
            
            if memory_data:
                # Restore memory from Redis
                memory_dict = json.loads(memory_data)
                memory = ConversationSummaryBufferMemory(
                    llm=None,  # Will be set by caller
                    max_token_limit=1000,
                    return_messages=True
                )
                # Restore chat history
                for message in memory_dict.get("messages", []):
                    memory.chat_memory.add_message(message)
                
                return memory
            else:
                # Create new memory
                return ConversationSummaryBufferMemory(
                    llm=None,
                    max_token_limit=1000,
                    return_messages=True
                )
                
        except Exception as e:
            logger.error(f"Memory loading error: {str(e)}")
            # Return new memory on error
            return ConversationSummaryBufferMemory(
                llm=None,
                max_token_limit=1000,
                return_messages=True
            )
    
    async def save_memory(self, session_id: str, memory):
        """Save conversation memory to Redis"""
        memory_key = f"memory:{session_id}"
        
        try:
            memory_data = {
                "session_id": session_id,
                "messages": [msg.dict() for msg in memory.chat_memory.messages],
                "updated_at": datetime.now().isoformat()
            }
            
            # Save for 24 hours
            await asyncio.to_thread(
                self.redis.setex,
                memory_key,
                86400,
                json.dumps(memory_data, default=str)
            )
            
        except Exception as e:
            logger.error(f"Memory saving error: {str(e)}")

# Middleware for production
class ProductionMiddleware:
    """Custom middleware for production features"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    async def rate_limit_middleware(self, request: Request, call_next):
        """Rate limiting middleware"""
        client_ip = request.client.host
        rate_key = f"rate_limit:{client_ip}"
        
        try:
            current_requests = await asyncio.to_thread(self.redis.get, rate_key)
            current_requests = int(current_requests or 0)
            
            if current_requests >= ProductionConfig.RATE_LIMIT_REQUESTS:
                raise HTTPException(
                    status_code=429,
                    detail="Rate limit exceeded"
                )
            
            # Increment counter
            pipe = self.redis.pipeline()
            pipe.incr(rate_key)
            pipe.expire(rate_key, ProductionConfig.RATE_LIMIT_WINDOW)
            await asyncio.to_thread(pipe.execute)
            
        except redis.RedisError:
            # Continue without rate limiting if Redis is down
            logger.warning("Redis unavailable for rate limiting")
        
        response = await call_next(request)
        return response

# Global manager instance
langchain_manager = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan management"""
    global langchain_manager
    
    # Startup
    logger.info("Starting LangChain production server...")
    langchain_manager = ProductionLangChainManager()
    
    yield
    
    # Shutdown
    logger.info("Shutting down LangChain production server...")
    # Cleanup resources
    if hasattr(langchain_manager, 'redis_client'):
        langchain_manager.redis_client.close()

# Create FastAPI app
app = FastAPI(
    title="LangChain Production API",
    description="Production-ready LangChain API server",
    version=ProductionConfig.API_VERSION,
    lifespan=lifespan
)

# Security
security = HTTPBearer()

# Add middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=ProductionConfig.CORS_ORIGINS,
    allow_credentials=True,
    allow_methods=["GET", "POST"],
    allow_headers=["*"],
)

app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=ProductionConfig.ALLOWED_HOSTS
)

# Authentication dependency
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Verify API token"""
    # Implement your token verification logic
    # For demo, we'll accept any token
    if not credentials.credentials:
        raise HTTPException(status_code=401, detail="Invalid token")
    return credentials.credentials

# API Endpoints
@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Health check endpoint"""
    uptime = time.time() - getattr(app.state, 'start_time', time.time())
    
    # Check component health
    components = {"api": "healthy"}
    
    try:
        langchain_manager.redis_client.ping()
        components["redis"] = "healthy"
    except:
        components["redis"] = "unhealthy"
    
    return HealthResponse(
        status="healthy" if all(status == "healthy" for status in components.values()) else "degraded",
        version=ProductionConfig.API_VERSION,
        uptime=uptime,
        components=components
    )

@app.post("/query", response_model=QueryResponse)
async def process_query(
    request: QueryRequest,
    background_tasks: BackgroundTasks,
    token: str = Depends(verify_token)
):
    """Process LangChain query"""
    try:
        # Log request
        logger.info(f"Processing query: {request.query[:100]}...")
        
        # Process query
        response = await langchain_manager.process_query(request)
        
        # Log successful response
        background_tasks.add_task(
            log_query_metrics,
            request.query,
            response.processing_time,
            "success"
        )
        
        return response
        
    except Exception as e:
        # Log error
        background_tasks.add_task(
            log_query_metrics,
            request.query,
            0,
            "error",
            str(e)
        )
        raise

@app.get("/sessions/{session_id}/memory")
async def get_session_memory(
    session_id: str,
    token: str = Depends(verify_token)
):
    """Get conversation memory for session"""
    try:
        memory_key = f"memory:{session_id}"
        memory_data = langchain_manager.redis_client.get(memory_key)
        
        if memory_data:
            return json.loads(memory_data)
        else:
            raise HTTPException(status_code=404, detail="Session not found")
            
    except json.JSONDecodeError:
        raise HTTPException(status_code=500, detail="Memory data corrupted")

@app.delete("/sessions/{session_id}")
async def delete_session(
    session_id: str,
    token: str = Depends(verify_token)
):
    """Delete session and its memory"""
    memory_key = f"memory:{session_id}"
    deleted = langchain_manager.redis_client.delete(memory_key)
    
    return {"deleted": bool(deleted), "session_id": session_id}

async def log_query_metrics(query: str, processing_time: float, status: str, error: str = None):
    """Log query metrics for monitoring"""
    metrics = {
        "timestamp": datetime.now().isoformat(),
        "query_length": len(query),
        "processing_time": processing_time,
        "status": status,
        "error": error
    }
    
    logger.info(f"Query metrics: {json.dumps(metrics)}")

# Run with: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        workers=1,  # Use 1 for development, multiple for production
        log_level="info"
    )

🐳 Docker Production Deployment ​

πŸ“¦ Multi-Stage Docker Build ​

dockerfile
# Multi-stage Docker build for production
FROM python:3.11-slim as builder

# Set build arguments
ARG POETRY_VERSION=1.6.1

# Install system dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Install Poetry
RUN pip install poetry==$POETRY_VERSION

# Set Poetry configuration
ENV POETRY_NO_INTERACTION=1 \
    POETRY_VENV_IN_PROJECT=1 \
    POETRY_CACHE_DIR=/tmp/poetry_cache

# Copy dependency files
COPY pyproject.toml poetry.lock ./

# Install dependencies
RUN poetry install --no-dev && rm -rf $POETRY_CACHE_DIR

# Production stage
FROM python:3.11-slim as production

# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser

# Set environment variables
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PATH="/app/.venv/bin:$PATH" \
    WORKERS=4 \
    PORT=8000

# Install runtime dependencies
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Copy virtual environment from builder
COPY --from=builder /.venv /app/.venv

# Set working directory
WORKDIR /app

# Copy application code
COPY --chown=appuser:appuser . .

# Switch to non-root user
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:$PORT/health || exit 1

# Expose port
EXPOSE $PORT

# Run application
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port $PORT --workers $WORKERS"]

πŸ™ Docker Compose for Development ​

yaml
# docker-compose.yml
version: '3.8'

services:
  langchain-api:
    build:
      context: .
      dockerfile: Dockerfile
      target: production
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - LOG_LEVEL=INFO
    depends_on:
      - redis
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - langchain-api
    restart: unless-stopped

  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    restart: unless-stopped

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
    restart: unless-stopped

volumes:
  redis_data:
  prometheus_data:
  grafana_data:

🌐 Nginx Configuration ​

nginx
# nginx.conf
events {
    worker_connections 1024;
}

http {
    upstream langchain_backend {
        server langchain-api:8000;
    }

    # Rate limiting
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;

    # Gzip compression
    gzip on;
    gzip_types text/plain application/json application/javascript text/css;

    server {
        listen 80;
        server_name yourdomain.com;

        # Redirect HTTP to HTTPS
        return 301 https://$server_name$request_uri;
    }

    server {
        listen 443 ssl http2;
        server_name yourdomain.com;

        # SSL configuration
        ssl_certificate /etc/nginx/ssl/cert.pem;
        ssl_certificate_key /etc/nginx/ssl/key.pem;
        ssl_protocols TLSv1.2 TLSv1.3;

        # Security headers
        add_header X-Frame-Options DENY;
        add_header X-Content-Type-Options nosniff;
        add_header X-XSS-Protection "1; mode=block";
        add_header Strict-Transport-Security "max-age=31536000; includeSubDomains";

        # API endpoints
        location /api/ {
            # Apply rate limiting
            limit_req zone=api burst=20 nodelay;

            # Proxy to backend
            proxy_pass http://langchain_backend/;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;

            # Timeouts
            proxy_connect_timeout 60s;
            proxy_send_timeout 60s;
            proxy_read_timeout 300s;

            # Buffer settings
            proxy_buffering on;
            proxy_buffer_size 4k;
            proxy_buffers 8 4k;
        }

        # Health check (no rate limiting)
        location /health {
            proxy_pass http://langchain_backend/health;
            access_log off;
        }

        # Static files
        location /static/ {
            alias /app/static/;
            expires 1y;
            add_header Cache-Control "public, immutable";
        }
    }
}

πŸ”§ Kubernetes Production Deployment ​

βš™οΈ Kubernetes Manifests ​

yaml
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: langchain-prod

---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: langchain-config
  namespace: langchain-prod
data:
  REDIS_URL: "redis://redis-service:6379"
  LOG_LEVEL: "INFO"
  WORKERS: "4"

---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: langchain-secrets
  namespace: langchain-prod
type: Opaque
data:
  OPENAI_API_KEY: # base64 encoded API key
  JWT_SECRET: # base64 encoded JWT secret

---
# k8s/redis-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis
  namespace: langchain-prod
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
    spec:
      containers:
      - name: redis
        image: redis:7-alpine
        ports:
        - containerPort: 6379
        command: ["redis-server", "--appendonly", "yes"]
        volumeMounts:
        - name: redis-storage
          mountPath: /data
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
      volumes:
      - name: redis-storage
        persistentVolumeClaim:
          claimName: redis-pvc

---
# k8s/redis-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: redis-service
  namespace: langchain-prod
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379

---
# k8s/langchain-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langchain-api
  namespace: langchain-prod
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langchain-api
  template:
    metadata:
      labels:
        app: langchain-api
    spec:
      containers:
      - name: langchain-api
        image: your-registry/langchain-api:latest
        ports:
        - containerPort: 8000
        envFrom:
        - configMapRef:
            name: langchain-config
        - secretRef:
            name: langchain-secrets
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        volumeMounts:
        - name: logs
          mountPath: /app/logs
      volumes:
      - name: logs
        emptyDir: {}

---
# k8s/langchain-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: langchain-service
  namespace: langchain-prod
spec:
  selector:
    app: langchain-api
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP

---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: langchain-ingress
  namespace: langchain-prod
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
    nginx.ingress.kubernetes.io/rate-limit: "100"
    nginx.ingress.kubernetes.io/rate-limit-window: "1m"
spec:
  tls:
  - hosts:
    - api.yourdomain.com
    secretName: langchain-tls
  rules:
  - host: api.yourdomain.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: langchain-service
            port:
              number: 80

---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: langchain-hpa
  namespace: langchain-prod
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: langchain-api
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

πŸ“Š Production Monitoring ​

πŸ“ˆ Prometheus Metrics ​

python
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time

# Define metrics
request_count = Counter(
    'langchain_requests_total',
    'Total number of requests',
    ['method', 'endpoint', 'status']
)

request_duration = Histogram(
    'langchain_request_duration_seconds',
    'Request duration in seconds',
    ['method', 'endpoint']
)

active_sessions = Gauge(
    'langchain_active_sessions',
    'Number of active sessions'
)

llm_tokens_used = Counter(
    'langchain_llm_tokens_total',
    'Total LLM tokens used',
    ['model', 'type']  # type: input/output
)

memory_usage = Gauge(
    'langchain_memory_usage_bytes',
    'Memory usage in bytes'
)

# Metrics middleware
class MetricsMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        start_time = time.time()
        method = scope["method"]
        path = scope["path"]

        # Process request
        response_sent = False
        status_code = 500

        async def send_wrapper(message):
            nonlocal response_sent, status_code
            if message["type"] == "http.response.start":
                status_code = message["status"]
            elif message["type"] == "http.response.body" and not response_sent:
                response_sent = True
                
                # Record metrics
                duration = time.time() - start_time
                request_count.labels(
                    method=method,
                    endpoint=path,
                    status=status_code
                ).inc()
                request_duration.labels(
                    method=method,
                    endpoint=path
                ).observe(duration)

            await send(message)

        await self.app(scope, receive, send_wrapper)

# Add metrics endpoint
@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    return Response(
        generate_latest(),
        media_type="text/plain"
    )

# Add metrics middleware to app
# app.add_middleware(MetricsMiddleware)

πŸ“Š Structured Logging ​

python
import structlog
from pythonjsonlogger import jsonlogger
import logging
import sys

def setup_logging():
    """Setup structured logging"""
    
    # Configure structlog
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.processors.JSONRenderer()
        ],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )

    # Configure standard logging
    formatter = jsonlogger.JsonFormatter(
        '%(asctime)s %(name)s %(levelname)s %(message)s'
    )

    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(formatter)

    root_logger = logging.getLogger()
    root_logger.addHandler(handler)
    root_logger.setLevel(logging.INFO)

    return structlog.get_logger()

logger = setup_logging()

# Use structured logging in your application
class ProductionLogger:
    def __init__(self):
        self.logger = structlog.get_logger()

    def log_query(self, query: str, session_id: str, processing_time: float, status: str):
        """Log query processing"""
        self.logger.info(
            "query_processed",
            query_length=len(query),
            session_id=session_id,
            processing_time=processing_time,
            status=status
        )

    def log_error(self, error: Exception, context: dict):
        """Log error with context"""
        self.logger.error(
            "error_occurred",
            error=str(error),
            error_type=type(error).__name__,
            **context
        )

    def log_performance(self, operation: str, duration: float, metadata: dict):
        """Log performance metrics"""
        self.logger.info(
            "performance_metric",
            operation=operation,
            duration=duration,
            **metadata
        )

🚦 Production Deployment Pipeline ​

πŸ”„ GitHub Actions CI/CD ​

yaml
# .github/workflows/deploy.yml
name: Deploy to Production

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
    
    - name: Install Poetry
      uses: snok/install-poetry@v1
    
    - name: Install dependencies
      run: poetry install
    
    - name: Run tests
      run: |
        poetry run pytest
        poetry run black --check .
        poetry run isort --check-only .
        poetry run flake8
    
    - name: Run security scan
      run: poetry run bandit -r src/

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.event_name == 'push'
    steps:
    - uses: actions/checkout@v4
    
    - name: Log in to Container Registry
      uses: docker/login-action@v2
      with:
        registry: ${{ env.REGISTRY }}
        username: ${{ github.actor }}
        password: ${{ secrets.GITHUB_TOKEN }}
    
    - name: Extract metadata
      id: meta
      uses: docker/metadata-action@v4
      with:
        images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
        tags: |
          type=ref,event=branch
          type=ref,event=pr
          type=sha
    
    - name: Build and push Docker image
      uses: docker/build-push-action@v4
      with:
        context: .
        push: true
        tags: ${{ steps.meta.outputs.tags }}
        labels: ${{ steps.meta.outputs.labels }}

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    environment: production
    steps:
    - uses: actions/checkout@v4
    
    - name: Configure kubectl
      run: |
        echo "${{ secrets.KUBECONFIG }}" | base64 -d > kubeconfig
        export KUBECONFIG=kubeconfig
    
    - name: Deploy to Kubernetes
      run: |
        export KUBECONFIG=kubeconfig
        kubectl set image deployment/langchain-api \
          langchain-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
          -n langchain-prod
        kubectl rollout status deployment/langchain-api -n langchain-prod
    
    - name: Run smoke tests
      run: |
        # Wait for deployment
        sleep 30
        # Run smoke tests against production
        curl -f https://api.yourdomain.com/health

πŸ”— Next Steps ​

Ready for security and monitoring? Continue with:


Key Production Patterns:

  • FastAPI architecture provides scalable HTTP API foundation
  • Connection pooling handles concurrent requests efficiently
  • Redis caching improves response times and reduces costs
  • Docker containerization ensures consistent deployments
  • Kubernetes orchestration provides scalability and reliability
  • Monitoring and logging enable production observability
  • CI/CD pipelines automate testing and deployment
  • Security layers protect against common vulnerabilities

Released under the MIT License.