Production Patterns - Scaling LangChain Systems β
Learn enterprise-grade patterns for deploying, scaling, and maintaining LangChain applications in production environments
π Production Readiness Overview β
Building production LangChain applications requires careful consideration of scalability, reliability, monitoring, and maintenance. This guide covers battle-tested patterns for enterprise deployment.
ποΈ Production System Architecture β
text
π PRODUCTION LANGCHAIN ARCHITECTURE π
(Enterprise-grade deployment)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CLIENT LAYER β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
β β Web App β β Mobile App β β API Clients β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββ
β API GATEWAY β
β β’ Authentication β’ Rate Limiting β’ Load Balancing β
β β’ Request Routing β’ SSL Termination β’ CORS Handling β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββ
β APPLICATION LAYER β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β FastAPI/Flask β β Celery Queue β β WebSocket Hub β β
β β Services β β Background β β Real-time β β
β β β β Tasks β β Processing β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββ
β LANGCHAIN LAYER β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββ β
β β Agents β β Chains β β Memory β β Tools β β
β β Pool β β Factory β β Manager β β Registryβ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββ β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββ
β INFRASTRUCTURE LAYER β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββ β
β β Vector DB β β LLM APIs β β Redis β β Logs & β β
β β (Pinecone/ β β (OpenAI/ β β Cache β β Metrics β β
β β Weaviate) β β Anthropic) β β β β (ELK) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββπ FastAPI Production Server β
π Enterprise API Server β
python
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
import asyncio
import logging
import time
import json
import redis
import os
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Production configurations
class ProductionConfig:
"""Production configuration settings"""
# API Settings
API_VERSION = "v1"
MAX_REQUEST_SIZE = 10 * 1024 * 1024 # 10MB
REQUEST_TIMEOUT = 300 # 5 minutes
# Rate Limiting
RATE_LIMIT_REQUESTS = 100
RATE_LIMIT_WINDOW = 3600 # 1 hour
# Security
ALLOWED_HOSTS = ["yourdomain.com", "api.yourdomain.com"]
CORS_ORIGINS = ["https://yourdomain.com", "https://app.yourdomain.com"]
# Redis Configuration
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
# Monitoring
ENABLE_METRICS = True
LOG_LEVEL = "INFO"
# Request/Response Models
class QueryRequest(BaseModel):
query: str = Field(..., min_length=1, max_length=1000)
session_id: Optional[str] = None
context: Optional[Dict[str, Any]] = None
options: Optional[Dict[str, Any]] = Field(default_factory=dict)
class QueryResponse(BaseModel):
response: str
session_id: str
metadata: Dict[str, Any]
processing_time: float
timestamp: datetime
class HealthResponse(BaseModel):
status: str
version: str
uptime: float
components: Dict[str, str]
# Production LangChain Manager
class ProductionLangChainManager:
"""Production-ready LangChain manager with pooling and caching"""
def __init__(self):
self.redis_client = redis.from_url(ProductionConfig.REDIS_URL)
self.llm_pool = self._create_llm_pool()
self.vector_store_pool = self._create_vector_store_pool()
self.agent_factory = AgentFactory()
self.memory_manager = MemoryManager(self.redis_client)
def _create_llm_pool(self):
"""Create LLM connection pool"""
from langchain_openai import ChatOpenAI
# Create multiple LLM instances for load balancing
pool = []
for i in range(5): # Pool of 5 LLM instances
llm = ChatOpenAI(
temperature=0.1,
model="gpt-3.5-turbo",
max_retries=2,
request_timeout=60
)
pool.append(llm)
return pool
def _create_vector_store_pool(self):
"""Create vector store connection pool"""
# Implementation depends on your vector store
# Example with Pinecone or Weaviate
return {
"default": None, # Initialize your vector store here
"backup": None # Backup vector store
}
async def get_llm(self):
"""Get LLM instance from pool"""
# Simple round-robin selection
if not hasattr(self, '_llm_index'):
self._llm_index = 0
llm = self.llm_pool[self._llm_index % len(self.llm_pool)]
self._llm_index += 1
return llm
async def process_query(self, request: QueryRequest) -> QueryResponse:
"""Process query with production patterns"""
start_time = time.time()
try:
# Get or create session
session_id = request.session_id or f"session_{int(time.time())}"
# Load conversation memory
memory = await self.memory_manager.get_memory(session_id)
# Get LLM from pool
llm = await self.get_llm()
# Create appropriate agent/chain
agent = await self.agent_factory.create_agent(
llm=llm,
memory=memory,
tools=request.options.get("tools", [])
)
# Process query
response = await agent.arun(request.query)
# Save memory
await self.memory_manager.save_memory(session_id, memory)
# Cache result
await self._cache_result(request.query, response, session_id)
processing_time = time.time() - start_time
return QueryResponse(
response=response,
session_id=session_id,
metadata={
"tokens_used": getattr(llm, 'tokens_used', 0),
"model": "gpt-3.5-turbo",
"tools_used": request.options.get("tools", [])
},
processing_time=processing_time,
timestamp=datetime.now()
)
except Exception as e:
logger.error(f"Query processing error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Processing error: {str(e)}")
async def _cache_result(self, query: str, response: str, session_id: str):
"""Cache query result"""
cache_key = f"query_cache:{hash(query + session_id)}"
cache_data = {
"query": query,
"response": response,
"timestamp": datetime.now().isoformat()
}
# Cache for 1 hour
await asyncio.to_thread(
self.redis_client.setex,
cache_key,
3600,
json.dumps(cache_data)
)
class AgentFactory:
"""Factory for creating different types of agents"""
async def create_agent(self, llm, memory, tools: List[str]):
"""Create agent based on requirements"""
from langchain.agents import AgentType, initialize_agent
from langchain.tools import DuckDuckGoSearchRun
# Initialize tools based on request
agent_tools = []
if "search" in tools:
agent_tools.append(DuckDuckGoSearchRun())
# Create agent
if agent_tools:
agent = initialize_agent(
agent_tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
memory=memory,
verbose=False,
handle_parsing_errors=True
)
else:
# Simple conversational agent
from langchain.chains import ConversationChain
agent = ConversationChain(
llm=llm,
memory=memory,
verbose=False
)
return agent
class MemoryManager:
"""Manages conversation memory with Redis persistence"""
def __init__(self, redis_client):
self.redis = redis_client
async def get_memory(self, session_id: str):
"""Get conversation memory for session"""
from langchain.memory import ConversationSummaryBufferMemory
memory_key = f"memory:{session_id}"
try:
# Try to load existing memory
memory_data = await asyncio.to_thread(self.redis.get, memory_key)
if memory_data:
# Restore memory from Redis
memory_dict = json.loads(memory_data)
memory = ConversationSummaryBufferMemory(
llm=None, # Will be set by caller
max_token_limit=1000,
return_messages=True
)
# Restore chat history
for message in memory_dict.get("messages", []):
memory.chat_memory.add_message(message)
return memory
else:
# Create new memory
return ConversationSummaryBufferMemory(
llm=None,
max_token_limit=1000,
return_messages=True
)
except Exception as e:
logger.error(f"Memory loading error: {str(e)}")
# Return new memory on error
return ConversationSummaryBufferMemory(
llm=None,
max_token_limit=1000,
return_messages=True
)
async def save_memory(self, session_id: str, memory):
"""Save conversation memory to Redis"""
memory_key = f"memory:{session_id}"
try:
memory_data = {
"session_id": session_id,
"messages": [msg.dict() for msg in memory.chat_memory.messages],
"updated_at": datetime.now().isoformat()
}
# Save for 24 hours
await asyncio.to_thread(
self.redis.setex,
memory_key,
86400,
json.dumps(memory_data, default=str)
)
except Exception as e:
logger.error(f"Memory saving error: {str(e)}")
# Middleware for production
class ProductionMiddleware:
"""Custom middleware for production features"""
def __init__(self, redis_client):
self.redis = redis_client
async def rate_limit_middleware(self, request: Request, call_next):
"""Rate limiting middleware"""
client_ip = request.client.host
rate_key = f"rate_limit:{client_ip}"
try:
current_requests = await asyncio.to_thread(self.redis.get, rate_key)
current_requests = int(current_requests or 0)
if current_requests >= ProductionConfig.RATE_LIMIT_REQUESTS:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
# Increment counter
pipe = self.redis.pipeline()
pipe.incr(rate_key)
pipe.expire(rate_key, ProductionConfig.RATE_LIMIT_WINDOW)
await asyncio.to_thread(pipe.execute)
except redis.RedisError:
# Continue without rate limiting if Redis is down
logger.warning("Redis unavailable for rate limiting")
response = await call_next(request)
return response
# Global manager instance
langchain_manager = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan management"""
global langchain_manager
# Startup
logger.info("Starting LangChain production server...")
langchain_manager = ProductionLangChainManager()
yield
# Shutdown
logger.info("Shutting down LangChain production server...")
# Cleanup resources
if hasattr(langchain_manager, 'redis_client'):
langchain_manager.redis_client.close()
# Create FastAPI app
app = FastAPI(
title="LangChain Production API",
description="Production-ready LangChain API server",
version=ProductionConfig.API_VERSION,
lifespan=lifespan
)
# Security
security = HTTPBearer()
# Add middleware
app.add_middleware(
CORSMiddleware,
allow_origins=ProductionConfig.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=ProductionConfig.ALLOWED_HOSTS
)
# Authentication dependency
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify API token"""
# Implement your token verification logic
# For demo, we'll accept any token
if not credentials.credentials:
raise HTTPException(status_code=401, detail="Invalid token")
return credentials.credentials
# API Endpoints
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
uptime = time.time() - getattr(app.state, 'start_time', time.time())
# Check component health
components = {"api": "healthy"}
try:
langchain_manager.redis_client.ping()
components["redis"] = "healthy"
except:
components["redis"] = "unhealthy"
return HealthResponse(
status="healthy" if all(status == "healthy" for status in components.values()) else "degraded",
version=ProductionConfig.API_VERSION,
uptime=uptime,
components=components
)
@app.post("/query", response_model=QueryResponse)
async def process_query(
request: QueryRequest,
background_tasks: BackgroundTasks,
token: str = Depends(verify_token)
):
"""Process LangChain query"""
try:
# Log request
logger.info(f"Processing query: {request.query[:100]}...")
# Process query
response = await langchain_manager.process_query(request)
# Log successful response
background_tasks.add_task(
log_query_metrics,
request.query,
response.processing_time,
"success"
)
return response
except Exception as e:
# Log error
background_tasks.add_task(
log_query_metrics,
request.query,
0,
"error",
str(e)
)
raise
@app.get("/sessions/{session_id}/memory")
async def get_session_memory(
session_id: str,
token: str = Depends(verify_token)
):
"""Get conversation memory for session"""
try:
memory_key = f"memory:{session_id}"
memory_data = langchain_manager.redis_client.get(memory_key)
if memory_data:
return json.loads(memory_data)
else:
raise HTTPException(status_code=404, detail="Session not found")
except json.JSONDecodeError:
raise HTTPException(status_code=500, detail="Memory data corrupted")
@app.delete("/sessions/{session_id}")
async def delete_session(
session_id: str,
token: str = Depends(verify_token)
):
"""Delete session and its memory"""
memory_key = f"memory:{session_id}"
deleted = langchain_manager.redis_client.delete(memory_key)
return {"deleted": bool(deleted), "session_id": session_id}
async def log_query_metrics(query: str, processing_time: float, status: str, error: str = None):
"""Log query metrics for monitoring"""
metrics = {
"timestamp": datetime.now().isoformat(),
"query_length": len(query),
"processing_time": processing_time,
"status": status,
"error": error
}
logger.info(f"Query metrics: {json.dumps(metrics)}")
# Run with: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
workers=1, # Use 1 for development, multiple for production
log_level="info"
)π³ Docker Production Deployment β
π¦ Multi-Stage Docker Build β
dockerfile
# Multi-stage Docker build for production
FROM python:3.11-slim as builder
# Set build arguments
ARG POETRY_VERSION=1.6.1
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Poetry
RUN pip install poetry==$POETRY_VERSION
# Set Poetry configuration
ENV POETRY_NO_INTERACTION=1 \
POETRY_VENV_IN_PROJECT=1 \
POETRY_CACHE_DIR=/tmp/poetry_cache
# Copy dependency files
COPY pyproject.toml poetry.lock ./
# Install dependencies
RUN poetry install --no-dev && rm -rf $POETRY_CACHE_DIR
# Production stage
FROM python:3.11-slim as production
# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Set environment variables
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PATH="/app/.venv/bin:$PATH" \
WORKERS=4 \
PORT=8000
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy virtual environment from builder
COPY --from=builder /.venv /app/.venv
# Set working directory
WORKDIR /app
# Copy application code
COPY --chown=appuser:appuser . .
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:$PORT/health || exit 1
# Expose port
EXPOSE $PORT
# Run application
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port $PORT --workers $WORKERS"]π Docker Compose for Development β
yaml
# docker-compose.yml
version: '3.8'
services:
langchain-api:
build:
context: .
dockerfile: Dockerfile
target: production
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LOG_LEVEL=INFO
depends_on:
- redis
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- langchain-api
restart: unless-stopped
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
restart: unless-stopped
grafana:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
restart: unless-stopped
volumes:
redis_data:
prometheus_data:
grafana_data:π Nginx Configuration β
nginx
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream langchain_backend {
server langchain-api:8000;
}
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
# Gzip compression
gzip on;
gzip_types text/plain application/json application/javascript text/css;
server {
listen 80;
server_name yourdomain.com;
# Redirect HTTP to HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name yourdomain.com;
# SSL configuration
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
# Security headers
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains";
# API endpoints
location /api/ {
# Apply rate limiting
limit_req zone=api burst=20 nodelay;
# Proxy to backend
proxy_pass http://langchain_backend/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 300s;
# Buffer settings
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
}
# Health check (no rate limiting)
location /health {
proxy_pass http://langchain_backend/health;
access_log off;
}
# Static files
location /static/ {
alias /app/static/;
expires 1y;
add_header Cache-Control "public, immutable";
}
}
}π§ Kubernetes Production Deployment β
βοΈ Kubernetes Manifests β
yaml
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: langchain-prod
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: langchain-config
namespace: langchain-prod
data:
REDIS_URL: "redis://redis-service:6379"
LOG_LEVEL: "INFO"
WORKERS: "4"
---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: langchain-secrets
namespace: langchain-prod
type: Opaque
data:
OPENAI_API_KEY: # base64 encoded API key
JWT_SECRET: # base64 encoded JWT secret
---
# k8s/redis-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: langchain-prod
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
command: ["redis-server", "--appendonly", "yes"]
volumeMounts:
- name: redis-storage
mountPath: /data
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
volumes:
- name: redis-storage
persistentVolumeClaim:
claimName: redis-pvc
---
# k8s/redis-service.yaml
apiVersion: v1
kind: Service
metadata:
name: redis-service
namespace: langchain-prod
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
---
# k8s/langchain-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: langchain-api
namespace: langchain-prod
spec:
replicas: 3
selector:
matchLabels:
app: langchain-api
template:
metadata:
labels:
app: langchain-api
spec:
containers:
- name: langchain-api
image: your-registry/langchain-api:latest
ports:
- containerPort: 8000
envFrom:
- configMapRef:
name: langchain-config
- secretRef:
name: langchain-secrets
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
volumeMounts:
- name: logs
mountPath: /app/logs
volumes:
- name: logs
emptyDir: {}
---
# k8s/langchain-service.yaml
apiVersion: v1
kind: Service
metadata:
name: langchain-service
namespace: langchain-prod
spec:
selector:
app: langchain-api
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: langchain-ingress
namespace: langchain-prod
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/rate-limit-window: "1m"
spec:
tls:
- hosts:
- api.yourdomain.com
secretName: langchain-tls
rules:
- host: api.yourdomain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: langchain-service
port:
number: 80
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: langchain-hpa
namespace: langchain-prod
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: langchain-api
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80π Production Monitoring β
π Prometheus Metrics β
python
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time
# Define metrics
request_count = Counter(
'langchain_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'langchain_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
active_sessions = Gauge(
'langchain_active_sessions',
'Number of active sessions'
)
llm_tokens_used = Counter(
'langchain_llm_tokens_total',
'Total LLM tokens used',
['model', 'type'] # type: input/output
)
memory_usage = Gauge(
'langchain_memory_usage_bytes',
'Memory usage in bytes'
)
# Metrics middleware
class MetricsMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
start_time = time.time()
method = scope["method"]
path = scope["path"]
# Process request
response_sent = False
status_code = 500
async def send_wrapper(message):
nonlocal response_sent, status_code
if message["type"] == "http.response.start":
status_code = message["status"]
elif message["type"] == "http.response.body" and not response_sent:
response_sent = True
# Record metrics
duration = time.time() - start_time
request_count.labels(
method=method,
endpoint=path,
status=status_code
).inc()
request_duration.labels(
method=method,
endpoint=path
).observe(duration)
await send(message)
await self.app(scope, receive, send_wrapper)
# Add metrics endpoint
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return Response(
generate_latest(),
media_type="text/plain"
)
# Add metrics middleware to app
# app.add_middleware(MetricsMiddleware)π Structured Logging β
python
import structlog
from pythonjsonlogger import jsonlogger
import logging
import sys
def setup_logging():
"""Setup structured logging"""
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Configure standard logging
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
return structlog.get_logger()
logger = setup_logging()
# Use structured logging in your application
class ProductionLogger:
def __init__(self):
self.logger = structlog.get_logger()
def log_query(self, query: str, session_id: str, processing_time: float, status: str):
"""Log query processing"""
self.logger.info(
"query_processed",
query_length=len(query),
session_id=session_id,
processing_time=processing_time,
status=status
)
def log_error(self, error: Exception, context: dict):
"""Log error with context"""
self.logger.error(
"error_occurred",
error=str(error),
error_type=type(error).__name__,
**context
)
def log_performance(self, operation: str, duration: float, metadata: dict):
"""Log performance metrics"""
self.logger.info(
"performance_metric",
operation=operation,
duration=duration,
**metadata
)π¦ Production Deployment Pipeline β
π GitHub Actions CI/CD β
yaml
# .github/workflows/deploy.yml
name: Deploy to Production
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install Poetry
uses: snok/install-poetry@v1
- name: Install dependencies
run: poetry install
- name: Run tests
run: |
poetry run pytest
poetry run black --check .
poetry run isort --check-only .
poetry run flake8
- name: Run security scan
run: poetry run bandit -r src/
build:
needs: test
runs-on: ubuntu-latest
if: github.event_name == 'push'
steps:
- uses: actions/checkout@v4
- name: Log in to Container Registry
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha
- name: Build and push Docker image
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
run: |
echo "${{ secrets.KUBECONFIG }}" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
- name: Deploy to Kubernetes
run: |
export KUBECONFIG=kubeconfig
kubectl set image deployment/langchain-api \
langchain-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
-n langchain-prod
kubectl rollout status deployment/langchain-api -n langchain-prod
- name: Run smoke tests
run: |
# Wait for deployment
sleep 30
# Run smoke tests against production
curl -f https://api.yourdomain.com/healthπ Next Steps β
Ready for security and monitoring? Continue with:
- Security and Privacy - Secure LangChain applications
- Testing and Evaluation - Test production systems
- Performance Optimization - Optimize for scale
Key Production Patterns:
- FastAPI architecture provides scalable HTTP API foundation
- Connection pooling handles concurrent requests efficiently
- Redis caching improves response times and reduces costs
- Docker containerization ensures consistent deployments
- Kubernetes orchestration provides scalability and reliability
- Monitoring and logging enable production observability
- CI/CD pipelines automate testing and deployment
- Security layers protect against common vulnerabilities