Skip to content

Callbacks & Monitoring - Observing AI Systems ​

Learn to implement comprehensive monitoring, debugging, and observability for LangChain applications using callbacks, metrics, and dashboards

�️ Understanding Callbacks & Monitoring ​

Callbacks provide real-time visibility into your AI systems, enabling you to track performance, debug issues, log conversations, and monitor costs. They're essential for production deployments where transparency and reliability are critical.

πŸ” The Observability Problem ​

text
                    πŸ” THE BLACK BOX PROBLEM πŸ”
                     (Why monitoring matters)

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                    UNMONITORED SYSTEM                          β”‚
    β”‚                      (Black box)                               β”‚
    β”‚                                                                β”‚
    β”‚  User Input β†’ [???] β†’ AI Response                              β”‚
    β”‚                                                                β”‚
    β”‚  ❌ No visibility into processing                              β”‚
    β”‚  ❌ Can't debug failures                                       β”‚
    β”‚  ❌ No cost tracking                                           β”‚
    β”‚  ❌ No performance metrics                                     β”‚
    β”‚  ❌ Compliance issues                                          β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό ADD CALLBACKS
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                   MONITORED SYSTEM                             β”‚
    β”‚                   (Full observability)                         β”‚
    β”‚                                                                β”‚
    β”‚  User Input β†’ [πŸ“Š Monitor] β†’ [πŸ” Track] β†’ [πŸ“ Log] β†’ Response  β”‚
    β”‚                                                                β”‚
    β”‚  βœ… Real-time visibility                                       β”‚
    β”‚  βœ… Detailed error tracking                                    β”‚
    β”‚  βœ… Cost monitoring                                            β”‚
    β”‚  βœ… Performance analytics                                      β”‚
    β”‚  βœ… Audit trails                                               β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ—οΈ Core Callback System ​

πŸ“‹ Basic Callback Implementation ​

python
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from langchain_core.messages import BaseMessage
from langchain_core.documents import Document
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import json
import time
import logging

# Basic logging callback
class LoggingCallbackHandler(BaseCallbackHandler):
    """Basic callback that logs all LangChain events"""
    
    def __init__(self, log_level: str = "INFO"):
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(getattr(logging, log_level))
        
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def on_llm_start(
        self, 
        serialized: Dict[str, Any], 
        prompts: List[str], 
        **kwargs: Any
    ) -> None:
        """Called when LLM starts processing"""
        self.logger.info(f"LLM Starting: {serialized.get('name', 'Unknown')}")
        self.logger.debug(f"Prompts: {prompts}")
    
    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Called when LLM finishes processing"""
        self.logger.info("LLM Completed")
        if response.llm_output:
            token_usage = response.llm_output.get('token_usage', {})
            if token_usage:
                self.logger.info(f"Token usage: {token_usage}")
    
    def on_llm_error(
        self, 
        error: Union[Exception, KeyboardInterrupt], 
        **kwargs: Any
    ) -> None:
        """Called when LLM encounters an error"""
        self.logger.error(f"LLM Error: {str(error)}")
    
    def on_chain_start(
        self, 
        serialized: Dict[str, Any], 
        inputs: Dict[str, Any], 
        **kwargs: Any
    ) -> None:
        """Called when chain starts"""
        chain_name = serialized.get('name', 'Unknown Chain')
        self.logger.info(f"Chain Starting: {chain_name}")
        self.logger.debug(f"Chain inputs: {inputs}")
    
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        """Called when chain ends"""
        self.logger.info("Chain Completed")
        self.logger.debug(f"Chain outputs: {outputs}")
    
    def on_chain_error(
        self, 
        error: Union[Exception, KeyboardInterrupt], 
        **kwargs: Any
    ) -> None:
        """Called when chain encounters an error"""
        self.logger.error(f"Chain Error: {str(error)}")
    
    def on_tool_start(
        self, 
        serialized: Dict[str, Any], 
        input_str: str, 
        **kwargs: Any
    ) -> None:
        """Called when tool starts"""
        tool_name = serialized.get('name', 'Unknown Tool')
        self.logger.info(f"Tool Starting: {tool_name}")
        self.logger.debug(f"Tool input: {input_str}")
    
    def on_tool_end(self, output: str, **kwargs: Any) -> None:
        """Called when tool ends"""
        self.logger.info("Tool Completed")
        self.logger.debug(f"Tool output: {output[:100]}...")
    
    def on_tool_error(
        self, 
        error: Union[Exception, KeyboardInterrupt], 
        **kwargs: Any
    ) -> None:
        """Called when tool encounters an error"""
        self.logger.error(f"Tool Error: {str(error)}")
    
    def on_text(self, text: str, **kwargs: Any) -> None:
        """Called when arbitrary text is processed"""
        self.logger.debug(f"Text processed: {text[:50]}...")

# Performance monitoring callback
class PerformanceCallbackHandler(BaseCallbackHandler):
    """Callback that tracks performance metrics"""
    
    def __init__(self):
        self.metrics = {
            'llm_calls': 0,
            'chain_calls': 0,
            'tool_calls': 0,
            'total_tokens': 0,
            'total_cost': 0.0,
            'execution_times': [],
            'errors': []
        }
        self.start_times = {}
        
        # Token costs (approximate)
        self.token_costs = {
            'gpt-3.5-turbo': {'input': 0.001, 'output': 0.002},  # per 1K tokens
            'gpt-4': {'input': 0.03, 'output': 0.06},
            'text-davinci-003': {'input': 0.02, 'output': 0.02}
        }
    
    def on_llm_start(
        self, 
        serialized: Dict[str, Any], 
        prompts: List[str], 
        **kwargs: Any
    ) -> None:
        """Track LLM start"""
        run_id = kwargs.get('run_id', 'unknown')
        self.start_times[f"llm_{run_id}"] = time.time()
        self.metrics['llm_calls'] += 1
    
    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Track LLM completion and costs"""
        run_id = kwargs.get('run_id', 'unknown')
        start_time = self.start_times.pop(f"llm_{run_id}", time.time())
        execution_time = time.time() - start_time
        
        self.metrics['execution_times'].append({
            'type': 'llm',
            'duration': execution_time,
            'timestamp': datetime.now()
        })
        
        # Track token usage and costs
        if response.llm_output:
            token_usage = response.llm_output.get('token_usage', {})
            model_name = response.llm_output.get('model_name', 'unknown')
            
            if token_usage:
                prompt_tokens = token_usage.get('prompt_tokens', 0)
                completion_tokens = token_usage.get('completion_tokens', 0)
                total_tokens = token_usage.get('total_tokens', 0)
                
                self.metrics['total_tokens'] += total_tokens
                
                # Calculate cost
                if model_name in self.token_costs:
                    costs = self.token_costs[model_name]
                    cost = (
                        (prompt_tokens / 1000) * costs['input'] +
                        (completion_tokens / 1000) * costs['output']
                    )
                    self.metrics['total_cost'] += cost
    
    def on_chain_start(
        self, 
        serialized: Dict[str, Any], 
        inputs: Dict[str, Any], 
        **kwargs: Any
    ) -> None:
        """Track chain start"""
        run_id = kwargs.get('run_id', 'unknown')
        self.start_times[f"chain_{run_id}"] = time.time()
        self.metrics['chain_calls'] += 1
    
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        """Track chain completion"""
        run_id = kwargs.get('run_id', 'unknown')
        start_time = self.start_times.pop(f"chain_{run_id}", time.time())
        execution_time = time.time() - start_time
        
        self.metrics['execution_times'].append({
            'type': 'chain',
            'duration': execution_time,
            'timestamp': datetime.now()
        })
    
    def on_tool_start(
        self, 
        serialized: Dict[str, Any], 
        input_str: str, 
        **kwargs: Any
    ) -> None:
        """Track tool start"""
        run_id = kwargs.get('run_id', 'unknown')
        self.start_times[f"tool_{run_id}"] = time.time()
        self.metrics['tool_calls'] += 1
    
    def on_tool_end(self, output: str, **kwargs: Any) -> None:
        """Track tool completion"""
        run_id = kwargs.get('run_id', 'unknown')
        start_time = self.start_times.pop(f"tool_{run_id}", time.time())
        execution_time = time.time() - start_time
        
        self.metrics['execution_times'].append({
            'type': 'tool',
            'duration': execution_time,
            'timestamp': datetime.now()
        })
    
    def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        """Track LLM errors"""
        self.metrics['errors'].append({
            'type': 'llm',
            'error': str(error),
            'timestamp': datetime.now()
        })
    
    def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        """Track chain errors"""
        self.metrics['errors'].append({
            'type': 'chain',
            'error': str(error),
            'timestamp': datetime.now()
        })
    
    def on_tool_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        """Track tool errors"""
        self.metrics['errors'].append({
            'type': 'tool',
            'error': str(error),
            'timestamp': datetime.now()
        })
    
    def get_metrics_summary(self) -> Dict[str, Any]:
        """Get comprehensive metrics summary"""
        execution_times = [m['duration'] for m in self.metrics['execution_times']]
        
        return {
            'calls': {
                'llm_calls': self.metrics['llm_calls'],
                'chain_calls': self.metrics['chain_calls'],
                'tool_calls': self.metrics['tool_calls']
            },
            'performance': {
                'total_execution_time': sum(execution_times),
                'average_execution_time': sum(execution_times) / len(execution_times) if execution_times else 0,
                'fastest_execution': min(execution_times) if execution_times else 0,
                'slowest_execution': max(execution_times) if execution_times else 0
            },
            'usage': {
                'total_tokens': self.metrics['total_tokens'],
                'total_cost_usd': round(self.metrics['total_cost'], 4)
            },
            'errors': {
                'total_errors': len(self.metrics['errors']),
                'error_types': [e['type'] for e in self.metrics['errors']]
            }
        }

# Test basic callbacks
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate

def demo_basic_callbacks():
    """Demonstrate basic callback functionality"""
    
    # Create callbacks
    logging_callback = LoggingCallbackHandler(log_level="INFO")
    performance_callback = PerformanceCallbackHandler()
    
    # Setup logging
    logging.basicConfig(level=logging.INFO)
    
    # Create LLM with callbacks
    llm = ChatOpenAI(
        temperature=0.7,
        callbacks=[logging_callback, performance_callback]
    )
    
    # Create simple chain
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Explain {topic} in simple terms."
    )
    
    chain = LLMChain(
        llm=llm,
        prompt=prompt,
        callbacks=[logging_callback, performance_callback]
    )
    
    print("πŸ“‹ Basic Callbacks Demo:")
    print("========================")
    
    # Test multiple calls
    topics = ["machine learning", "quantum computing", "blockchain"]
    
    for topic in topics:
        print(f"\nProcessing topic: {topic}")
        try:
            result = chain.run(topic=topic)
            print(f"Result: {result[:100]}...")
        except Exception as e:
            print(f"Error: {e}")
    
    # Show performance metrics
    metrics = performance_callback.get_metrics_summary()
    print(f"\nπŸ“Š Performance Metrics:")
    print(json.dumps(metrics, indent=2, default=str))
    
    return performance_callback

performance_callback = demo_basic_callbacks()

πŸ”„ Conversation Tracking Callback ​

python
class ConversationTrackingCallback(BaseCallbackHandler):
    """Callback that tracks full conversation flows"""
    
    def __init__(self, session_id: str = None):
        self.session_id = session_id or f"session_{int(time.time())}"
        self.conversations = []
        self.current_conversation = None
        self.message_count = 0
    
    def on_chain_start(
        self, 
        serialized: Dict[str, Any], 
        inputs: Dict[str, Any], 
        **kwargs: Any
    ) -> None:
        """Start tracking a conversation"""
        self.current_conversation = {
            'session_id': self.session_id,
            'conversation_id': f"conv_{int(time.time())}_{self.message_count}",
            'start_time': datetime.now(),
            'inputs': inputs,
            'chain_name': serialized.get('name', 'Unknown'),
            'steps': [],
            'outputs': None,
            'error': None,
            'metadata': kwargs
        }
        self.message_count += 1
    
    def on_llm_start(
        self, 
        serialized: Dict[str, Any], 
        prompts: List[str], 
        **kwargs: Any
    ) -> None:
        """Track LLM interaction within conversation"""
        if self.current_conversation:
            self.current_conversation['steps'].append({
                'type': 'llm_start',
                'timestamp': datetime.now(),
                'model': serialized.get('name', 'Unknown'),
                'prompts': prompts,
                'metadata': kwargs
            })
    
    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Track LLM response"""
        if self.current_conversation:
            self.current_conversation['steps'].append({
                'type': 'llm_end',
                'timestamp': datetime.now(),
                'response': response.generations[0][0].text if response.generations else '',
                'token_usage': response.llm_output.get('token_usage', {}) if response.llm_output else {},
                'metadata': kwargs
            })
    
    def on_tool_start(
        self, 
        serialized: Dict[str, Any], 
        input_str: str, 
        **kwargs: Any
    ) -> None:
        """Track tool usage"""
        if self.current_conversation:
            self.current_conversation['steps'].append({
                'type': 'tool_start',
                'timestamp': datetime.now(),
                'tool_name': serialized.get('name', 'Unknown'),
                'input': input_str,
                'metadata': kwargs
            })
    
    def on_tool_end(self, output: str, **kwargs: Any) -> None:
        """Track tool output"""
        if self.current_conversation:
            self.current_conversation['steps'].append({
                'type': 'tool_end',
                'timestamp': datetime.now(),
                'output': output,
                'metadata': kwargs
            })
    
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        """Complete conversation tracking"""
        if self.current_conversation:
            self.current_conversation['end_time'] = datetime.now()
            self.current_conversation['outputs'] = outputs
            self.current_conversation['duration'] = (
                self.current_conversation['end_time'] - 
                self.current_conversation['start_time']
            ).total_seconds()
            
            self.conversations.append(self.current_conversation)
            self.current_conversation = None
    
    def on_chain_error(
        self, 
        error: Union[Exception, KeyboardInterrupt], 
        **kwargs: Any
    ) -> None:
        """Track conversation errors"""
        if self.current_conversation:
            self.current_conversation['end_time'] = datetime.now()
            self.current_conversation['error'] = str(error)
            self.current_conversation['duration'] = (
                self.current_conversation['end_time'] - 
                self.current_conversation['start_time']
            ).total_seconds()
            
            self.conversations.append(self.current_conversation)
            self.current_conversation = None
    
    def get_conversation_history(self) -> List[Dict[str, Any]]:
        """Get full conversation history"""
        return self.conversations
    
    def get_session_summary(self) -> Dict[str, Any]:
        """Get session summary statistics"""
        if not self.conversations:
            return {"message": "No conversations recorded"}
        
        total_duration = sum(c['duration'] for c in self.conversations)
        successful_conversations = [c for c in self.conversations if not c['error']]
        
        return {
            'session_id': self.session_id,
            'total_conversations': len(self.conversations),
            'successful_conversations': len(successful_conversations),
            'total_duration': total_duration,
            'average_duration': total_duration / len(self.conversations),
            'error_rate': (len(self.conversations) - len(successful_conversations)) / len(self.conversations),
            'tools_used': self._get_tools_used(),
            'models_used': self._get_models_used()
        }
    
    def _get_tools_used(self) -> List[str]:
        """Extract unique tools used in session"""
        tools = set()
        for conv in self.conversations:
            for step in conv['steps']:
                if step['type'] == 'tool_start':
                    tools.add(step['tool_name'])
        return list(tools)
    
    def _get_models_used(self) -> List[str]:
        """Extract unique models used in session"""
        models = set()
        for conv in self.conversations:
            for step in conv['steps']:
                if step['type'] == 'llm_start':
                    models.add(step['model'])
        return list(models)

# Demo conversation tracking
def demo_conversation_tracking():
    """Demonstrate conversation tracking capabilities"""
    
    from langchain.agents import AgentExecutor, create_openai_functions_agent
    from langchain.tools import Tool
    
    # Create tracking callback
    conversation_tracker = ConversationTrackingCallback("demo_session_001")
    
    # Simple tools for demo
    def calculator(expression: str) -> str:
        """Simple calculator tool"""
        try:
            result = eval(expression)
            return f"Result: {result}"
        except Exception as e:
            return f"Error: {e}"
    
    def weather(city: str) -> str:
        """Mock weather tool"""
        return f"Weather in {city}: 72Β°F, sunny"
    
    tools = [
        Tool(name="calculator", description="Perform calculations", func=calculator),
        Tool(name="weather", description="Get weather info", func=weather)
    ]
    
    # Create agent with tracking
    llm = ChatOpenAI(temperature=0, callbacks=[conversation_tracker])
    
    from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant with access to tools."),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad")
    ])
    
    agent = create_openai_functions_agent(llm=llm, tools=tools, prompt=prompt)
    agent_executor = AgentExecutor(
        agent=agent, 
        tools=tools, 
        callbacks=[conversation_tracker]
    )
    
    print("\nπŸ”„ Conversation Tracking Demo:")
    print("=============================")
    
    # Test conversations
    test_inputs = [
        "What's 25 * 17?",
        "What's the weather like in New York?",
        "Calculate 100 / 4 and tell me about the weather in London"
    ]
    
    for i, user_input in enumerate(test_inputs, 1):
        print(f"\nConversation {i}: {user_input}")
        try:
            result = agent_executor.invoke({"input": user_input})
            print(f"Response: {result['output'][:100]}...")
        except Exception as e:
            print(f"Error: {e}")
    
    # Show conversation history
    print("\nπŸ“Š Conversation History:")
    history = conversation_tracker.get_conversation_history()
    for i, conv in enumerate(history, 1):
        print(f"\nConversation {i}:")
        print(f"  Duration: {conv['duration']:.2f}s")
        print(f"  Steps: {len(conv['steps'])}")
        if conv['error']:
            print(f"  Error: {conv['error']}")
        else:
            print(f"  Success: {bool(conv['outputs'])}")
    
    # Show session summary
    summary = conversation_tracker.get_session_summary()
    print(f"\nπŸ“ˆ Session Summary:")
    print(json.dumps(summary, indent=2, default=str))
    
    return conversation_tracker

conversation_tracker = demo_conversation_tracking()

πŸ“Š Advanced Monitoring Patterns ​

🎯 Real-time Dashboard Callback ​

python
import threading
import queue
from datetime import datetime, timedelta
from typing import Dict, Any, List

class DashboardCallback(BaseCallbackHandler):
    """Callback that feeds real-time data to a monitoring dashboard"""
    
    def __init__(self, update_interval: int = 5):
        self.update_interval = update_interval
        self.metrics_queue = queue.Queue()
        self.dashboard_data = {
            'current_status': 'idle',
            'active_chains': 0,
            'recent_activity': [],
            'performance_metrics': {
                'avg_response_time': 0,
                'requests_per_minute': 0,
                'error_rate': 0,
                'token_usage_rate': 0
            },
            'real_time_events': []
        }
        
        # Start background processor
        self.processor_thread = threading.Thread(target=self._process_metrics, daemon=True)
        self.processor_thread.start()
        
        # Track active operations
        self.active_operations = {}
        self.recent_metrics = []
    
    def on_chain_start(
        self, 
        serialized: Dict[str, Any], 
        inputs: Dict[str, Any], 
        **kwargs: Any
    ) -> None:
        """Track chain start for real-time monitoring"""
        run_id = kwargs.get('run_id', 'unknown')
        
        event = {
            'type': 'chain_start',
            'timestamp': datetime.now(),
            'run_id': run_id,
            'chain_name': serialized.get('name', 'Unknown'),
            'inputs': inputs
        }
        
        self.active_operations[run_id] = {
            'start_time': datetime.now(),
            'type': 'chain',
            'name': serialized.get('name', 'Unknown')
        }
        
        self.dashboard_data['active_chains'] += 1
        self.dashboard_data['current_status'] = 'processing'
        
        self._add_real_time_event(event)
        self.metrics_queue.put(event)
    
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        """Track chain completion"""
        run_id = kwargs.get('run_id', 'unknown')
        
        if run_id in self.active_operations:
            operation = self.active_operations.pop(run_id)
            duration = (datetime.now() - operation['start_time']).total_seconds()
            
            event = {
                'type': 'chain_end',
                'timestamp': datetime.now(),
                'run_id': run_id,
                'duration': duration,
                'success': True
            }
            
            self.dashboard_data['active_chains'] = max(0, self.dashboard_data['active_chains'] - 1)
            if self.dashboard_data['active_chains'] == 0:
                self.dashboard_data['current_status'] = 'idle'
            
            self._add_real_time_event(event)
            self._update_performance_metrics(duration, success=True)
            self.metrics_queue.put(event)
    
    def on_chain_error(
        self, 
        error: Union[Exception, KeyboardInterrupt], 
        **kwargs: Any
    ) -> None:
        """Track chain errors"""
        run_id = kwargs.get('run_id', 'unknown')
        
        if run_id in self.active_operations:
            operation = self.active_operations.pop(run_id)
            duration = (datetime.now() - operation['start_time']).total_seconds()
            
            event = {
                'type': 'chain_error',
                'timestamp': datetime.now(),
                'run_id': run_id,
                'duration': duration,
                'error': str(error),
                'success': False
            }
            
            self.dashboard_data['active_chains'] = max(0, self.dashboard_data['active_chains'] - 1)
            if self.dashboard_data['active_chains'] == 0:
                self.dashboard_data['current_status'] = 'idle'
            
            self._add_real_time_event(event)
            self._update_performance_metrics(duration, success=False)
            self.metrics_queue.put(event)
    
    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Track token usage"""
        if response.llm_output:
            token_usage = response.llm_output.get('token_usage', {})
            if token_usage:
                event = {
                    'type': 'token_usage',
                    'timestamp': datetime.now(),
                    'tokens': token_usage.get('total_tokens', 0),
                    'prompt_tokens': token_usage.get('prompt_tokens', 0),
                    'completion_tokens': token_usage.get('completion_tokens', 0)
                }
                
                self._add_real_time_event(event)
                self.metrics_queue.put(event)
    
    def _add_real_time_event(self, event: Dict[str, Any]):
        """Add event to real-time feed"""
        self.dashboard_data['real_time_events'].append(event)
        
        # Keep only last 50 events
        if len(self.dashboard_data['real_time_events']) > 50:
            self.dashboard_data['real_time_events'] = self.dashboard_data['real_time_events'][-50:]
        
        # Update recent activity
        activity_summary = f"{event['type']} at {event['timestamp'].strftime('%H:%M:%S')}"
        self.dashboard_data['recent_activity'].append(activity_summary)
        
        if len(self.dashboard_data['recent_activity']) > 10:
            self.dashboard_data['recent_activity'] = self.dashboard_data['recent_activity'][-10:]
    
    def _update_performance_metrics(self, duration: float, success: bool):
        """Update performance metrics"""
        self.recent_metrics.append({
            'duration': duration,
            'success': success,
            'timestamp': datetime.now()
        })
        
        # Keep metrics for last 10 minutes
        cutoff_time = datetime.now() - timedelta(minutes=10)
        self.recent_metrics = [
            m for m in self.recent_metrics 
            if m['timestamp'] > cutoff_time
        ]
        
        if self.recent_metrics:
            # Calculate metrics
            durations = [m['duration'] for m in self.recent_metrics]
            successes = [m['success'] for m in self.recent_metrics]
            
            self.dashboard_data['performance_metrics']['avg_response_time'] = sum(durations) / len(durations)
            self.dashboard_data['performance_metrics']['requests_per_minute'] = len(self.recent_metrics) / 10  # 10 minute window
            self.dashboard_data['performance_metrics']['error_rate'] = 1 - (sum(successes) / len(successes))
    
    def _process_metrics(self):
        """Background thread to process metrics"""
        while True:
            try:
                # Process queued events
                while not self.metrics_queue.empty():
                    event = self.metrics_queue.get_nowait()
                    # Here you could send to external monitoring systems
                    # e.g., Prometheus, DataDog, CloudWatch, etc.
                
                time.sleep(self.update_interval)
            except Exception as e:
                print(f"Metrics processing error: {e}")
    
    def get_dashboard_data(self) -> Dict[str, Any]:
        """Get current dashboard data"""
        return self.dashboard_data.copy()
    
    def print_dashboard(self):
        """Print simple text dashboard"""
        data = self.get_dashboard_data()
        
        print("\n" + "="*60)
        print("πŸ–₯️  LANGCHAIN MONITORING DASHBOARD")
        print("="*60)
        
        print(f"Status: {data['current_status'].upper()}")
        print(f"Active Chains: {data['active_chains']}")
        
        print("\nπŸ“Š Performance Metrics:")
        metrics = data['performance_metrics']
        print(f"  Avg Response Time: {metrics['avg_response_time']:.2f}s")
        print(f"  Requests/Minute: {metrics['requests_per_minute']:.1f}")
        print(f"  Error Rate: {metrics['error_rate']:.1%}")
        
        print("\nπŸ”„ Recent Activity:")
        for activity in data['recent_activity'][-5:]:
            print(f"  β€’ {activity}")
        
        print("\n⚑ Real-time Events (Last 5):")
        for event in data['real_time_events'][-5:]:
            timestamp = event['timestamp'].strftime('%H:%M:%S')
            event_type = event['type']
            print(f"  [{timestamp}] {event_type}")
        
        print("="*60)

# Demo real-time dashboard
def demo_dashboard_monitoring():
    """Demonstrate real-time dashboard monitoring"""
    
    dashboard_callback = DashboardCallback(update_interval=2)
    
    # Create simple chain with dashboard monitoring
    llm = ChatOpenAI(temperature=0.7, callbacks=[dashboard_callback])
    
    from langchain.chains import LLMChain
    from langchain_core.prompts import PromptTemplate
    
    prompt = PromptTemplate(
        input_variables=["question"],
        template="Answer this question briefly: {question}"
    )
    
    chain = LLMChain(
        llm=llm,
        prompt=prompt,
        callbacks=[dashboard_callback]
    )
    
    print("\nπŸ–₯️ Real-time Dashboard Monitoring Demo:")
    print("======================================")
    
    # Simulate various workloads
    questions = [
        "What is artificial intelligence?",
        "How does machine learning work?",
        "Explain quantum computing",
        "What are neural networks?",
        "How does blockchain technology work?"
    ]
    
    # Process questions with delays to show real-time updates
    for i, question in enumerate(questions, 1):
        print(f"\nProcessing question {i}...")
        
        try:
            result = chain.run(question=question)
            print(f"βœ… Completed: {result[:50]}...")
        except Exception as e:
            print(f"❌ Error: {e}")
        
        # Show dashboard every few requests
        if i % 2 == 0:
            dashboard_callback.print_dashboard()
        
        time.sleep(1)  # Small delay between requests
    
    # Final dashboard view
    print("\nπŸ” Final Dashboard State:")
    dashboard_callback.print_dashboard()
    
    return dashboard_callback

dashboard_callback = demo_dashboard_monitoring()

πŸ—οΈ Production Monitoring System ​

python
import sqlite3
import json
from pathlib import Path
from typing import Dict, Any, List, Optional

class ProductionMonitoringSystem:
    """Comprehensive production monitoring system"""
    
    def __init__(self, db_path: str = "langchain_monitoring.db", retention_days: int = 30):
        self.db_path = db_path
        self.retention_days = retention_days
        self._init_database()
        
        # Create composite callback
        self.callback = self._create_monitoring_callback()
    
    def _init_database(self):
        """Initialize monitoring database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Execution logs table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS execution_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME NOT NULL,
                session_id TEXT,
                run_id TEXT,
                event_type TEXT NOT NULL,
                component_type TEXT,
                component_name TEXT,
                duration_ms INTEGER,
                success BOOLEAN,
                error_message TEXT,
                input_data TEXT,
                output_data TEXT,
                metadata TEXT
            )
        """)
        
        # Performance metrics table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS performance_metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME NOT NULL,
                metric_name TEXT NOT NULL,
                metric_value REAL NOT NULL,
                tags TEXT
            )
        """)
        
        # Token usage table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS token_usage (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME NOT NULL,
                model_name TEXT,
                prompt_tokens INTEGER,
                completion_tokens INTEGER,
                total_tokens INTEGER,
                estimated_cost REAL
            )
        """)
        
        # Alerts table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS alerts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME NOT NULL,
                alert_type TEXT NOT NULL,
                severity TEXT NOT NULL,
                message TEXT NOT NULL,
                context TEXT,
                acknowledged BOOLEAN DEFAULT FALSE
            )
        """)
        
        conn.commit()
        conn.close()
    
    def _create_monitoring_callback(self):
        """Create comprehensive monitoring callback"""
        
        class ProductionCallback(BaseCallbackHandler):
            def __init__(self, parent):
                self.parent = parent
                self.session_id = f"session_{int(time.time())}"
                self.active_runs = {}
            
            def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
                run_id = kwargs.get('run_id', 'unknown')
                self.active_runs[run_id] = {
                    'start_time': datetime.now(),
                    'type': 'chain',
                    'name': serialized.get('name', 'Unknown')
                }
                
                self.parent._log_execution(
                    event_type='chain_start',
                    component_type='chain',
                    component_name=serialized.get('name', 'Unknown'),
                    run_id=run_id,
                    input_data=json.dumps(inputs),
                    session_id=self.session_id
                )
            
            def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
                run_id = kwargs.get('run_id', 'unknown')
                
                if run_id in self.active_runs:
                    run_info = self.active_runs.pop(run_id)
                    duration = (datetime.now() - run_info['start_time']).total_seconds() * 1000
                    
                    self.parent._log_execution(
                        event_type='chain_end',
                        component_type='chain',
                        component_name=run_info['name'],
                        run_id=run_id,
                        duration_ms=duration,
                        success=True,
                        output_data=json.dumps(outputs),
                        session_id=self.session_id
                    )
                    
                    # Check for performance alerts
                    self.parent._check_performance_alerts(duration, 'chain')
            
            def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs):
                run_id = kwargs.get('run_id', 'unknown')
                
                if run_id in self.active_runs:
                    run_info = self.active_runs.pop(run_id)
                    duration = (datetime.now() - run_info['start_time']).total_seconds() * 1000
                    
                    self.parent._log_execution(
                        event_type='chain_error',
                        component_type='chain',
                        component_name=run_info['name'],
                        run_id=run_id,
                        duration_ms=duration,
                        success=False,
                        error_message=str(error),
                        session_id=self.session_id
                    )
                    
                    # Create alert for error
                    self.parent._create_alert(
                        alert_type='error',
                        severity='high',
                        message=f"Chain error: {str(error)}",
                        context=json.dumps({
                            'chain_name': run_info['name'],
                            'run_id': run_id,
                            'duration_ms': duration
                        })
                    )
            
            def on_llm_end(self, response: LLMResult, **kwargs):
                if response.llm_output:
                    token_usage = response.llm_output.get('token_usage', {})
                    model_name = response.llm_output.get('model_name', 'unknown')
                    
                    if token_usage:
                        self.parent._log_token_usage(
                            model_name=model_name,
                            prompt_tokens=token_usage.get('prompt_tokens', 0),
                            completion_tokens=token_usage.get('completion_tokens', 0),
                            total_tokens=token_usage.get('total_tokens', 0)
                        )
        
        return ProductionCallback(self)
    
    def _log_execution(
        self, 
        event_type: str, 
        component_type: str = None,
        component_name: str = None,
        run_id: str = None,
        duration_ms: float = None,
        success: bool = None,
        error_message: str = None,
        input_data: str = None,
        output_data: str = None,
        session_id: str = None,
        metadata: Dict[str, Any] = None
    ):
        """Log execution event to database"""
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO execution_logs 
            (timestamp, session_id, run_id, event_type, component_type, 
             component_name, duration_ms, success, error_message, 
             input_data, output_data, metadata)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            datetime.now(),
            session_id,
            run_id,
            event_type,
            component_type,
            component_name,
            duration_ms,
            success,
            error_message,
            input_data,
            output_data,
            json.dumps(metadata) if metadata else None
        ))
        
        conn.commit()
        conn.close()
    
    def _log_token_usage(
        self, 
        model_name: str, 
        prompt_tokens: int, 
        completion_tokens: int, 
        total_tokens: int
    ):
        """Log token usage and estimated costs"""
        
        # Estimate cost (simplified)
        cost_per_1k_tokens = {
            'gpt-3.5-turbo': 0.002,
            'gpt-4': 0.06,
            'text-davinci-003': 0.02
        }
        
        estimated_cost = (total_tokens / 1000) * cost_per_1k_tokens.get(model_name, 0.01)
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO token_usage 
            (timestamp, model_name, prompt_tokens, completion_tokens, 
             total_tokens, estimated_cost)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            datetime.now(),
            model_name,
            prompt_tokens,
            completion_tokens,
            total_tokens,
            estimated_cost
        ))
        
        conn.commit()
        conn.close()
    
    def _create_alert(
        self, 
        alert_type: str, 
        severity: str, 
        message: str, 
        context: str = None
    ):
        """Create monitoring alert"""
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO alerts (timestamp, alert_type, severity, message, context)
            VALUES (?, ?, ?, ?, ?)
        """, (datetime.now(), alert_type, severity, message, context))
        
        conn.commit()
        conn.close()
        
        # Print alert for demo
        print(f"🚨 ALERT [{severity.upper()}]: {message}")
    
    def _check_performance_alerts(self, duration_ms: float, component_type: str):
        """Check for performance issues and create alerts"""
        
        thresholds = {
            'chain': 10000,  # 10 seconds
            'llm': 30000,    # 30 seconds
            'tool': 5000     # 5 seconds
        }
        
        threshold = thresholds.get(component_type, 15000)
        
        if duration_ms > threshold:
            self._create_alert(
                alert_type='performance',
                severity='medium',
                message=f"Slow {component_type} execution: {duration_ms:.0f}ms",
                context=json.dumps({
                    'duration_ms': duration_ms,
                    'threshold_ms': threshold,
                    'component_type': component_type
                })
            )
    
    def get_performance_report(self, hours: int = 24) -> Dict[str, Any]:
        """Generate performance report for last N hours"""
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        # Get execution stats
        cursor.execute("""
            SELECT 
                component_type,
                COUNT(*) as total_executions,
                AVG(duration_ms) as avg_duration,
                MIN(duration_ms) as min_duration,
                MAX(duration_ms) as max_duration,
                SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successful_executions
            FROM execution_logs 
            WHERE timestamp > ? AND duration_ms IS NOT NULL
            GROUP BY component_type
        """, (cutoff_time,))
        
        execution_stats = cursor.fetchall()
        
        # Get token usage
        cursor.execute("""
            SELECT 
                model_name,
                SUM(total_tokens) as total_tokens,
                SUM(estimated_cost) as total_cost,
                COUNT(*) as api_calls
            FROM token_usage 
            WHERE timestamp > ?
            GROUP BY model_name
        """, (cutoff_time,))
        
        token_stats = cursor.fetchall()
        
        # Get alerts
        cursor.execute("""
            SELECT alert_type, severity, COUNT(*) as count
            FROM alerts 
            WHERE timestamp > ?
            GROUP BY alert_type, severity
        """, (cutoff_time,))
        
        alert_stats = cursor.fetchall()
        
        conn.close()
        
        return {
            'report_period': f"Last {hours} hours",
            'execution_stats': [
                {
                    'component_type': row[0],
                    'total_executions': row[1],
                    'avg_duration_ms': row[2],
                    'min_duration_ms': row[3],
                    'max_duration_ms': row[4],
                    'success_rate': row[5] / row[1] if row[1] > 0 else 0
                }
                for row in execution_stats
            ],
            'token_usage': [
                {
                    'model': row[0],
                    'total_tokens': row[1],
                    'total_cost_usd': row[2],
                    'api_calls': row[3]
                }
                for row in token_stats
            ],
            'alerts': [
                {
                    'type': row[0],
                    'severity': row[1],
                    'count': row[2]
                }
                for row in alert_stats
            ]
        }
    
    def cleanup_old_data(self):
        """Clean up old monitoring data"""
        cutoff_date = datetime.now() - timedelta(days=self.retention_days)
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        tables = ['execution_logs', 'performance_metrics', 'token_usage', 'alerts']
        
        for table in tables:
            cursor.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff_date,))
        
        conn.commit()
        conn.close()

# Demo production monitoring
def demo_production_monitoring():
    """Demonstrate production monitoring system"""
    
    # Create monitoring system
    monitoring = ProductionMonitoringSystem(db_path="demo_monitoring.db")
    
    # Create chain with monitoring
    llm = ChatOpenAI(temperature=0.7, callbacks=[monitoring.callback])
    
    from langchain.chains import LLMChain
    from langchain_core.prompts import PromptTemplate
    
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Explain {topic} in one paragraph."
    )
    
    chain = LLMChain(
        llm=llm,
        prompt=prompt,
        callbacks=[monitoring.callback]
    )
    
    print("\nπŸ—οΈ Production Monitoring Demo:")
    print("==============================")
    
    # Test various scenarios
    test_cases = [
        "machine learning",
        "quantum computing", 
        "blockchain technology",
        "artificial intelligence",
        "data science"
    ]
    
    for topic in test_cases:
        print(f"\nProcessing: {topic}")
        try:
            result = chain.run(topic=topic)
            print(f"βœ… Success: {len(result)} characters")
        except Exception as e:
            print(f"❌ Error: {e}")
        
        time.sleep(0.5)
    
    # Generate performance report
    print("\nπŸ“Š Performance Report:")
    print("======================")
    
    report = monitoring.get_performance_report(hours=1)
    print(json.dumps(report, indent=2, default=str))
    
    return monitoring

production_monitoring = demo_production_monitoring()

πŸ”— Next Steps ​

Ready to build advanced AI systems? Continue with:


Key Monitoring Takeaways:

  • Callbacks provide visibility into every aspect of LangChain execution
  • Performance tracking is essential for production systems
  • Real-time monitoring enables proactive issue detection
  • Comprehensive logging supports debugging and analysis
  • Alert systems notify you of issues before users do
  • Cost tracking prevents unexpected billing surprises
  • Dashboard views provide operational insights at a glance
  • Historical analysis helps optimize system performance over time

Released under the MIT License.