Callbacks & Monitoring - Observing AI Systems β
Learn to implement comprehensive monitoring, debugging, and observability for LangChain applications using callbacks, metrics, and dashboards
οΏ½οΈ Understanding Callbacks & Monitoring β
Callbacks provide real-time visibility into your AI systems, enabling you to track performance, debug issues, log conversations, and monitor costs. They're essential for production deployments where transparency and reliability are critical.
π The Observability Problem β
text
π THE BLACK BOX PROBLEM π
(Why monitoring matters)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β UNMONITORED SYSTEM β
β (Black box) β
β β
β User Input β [???] β AI Response β
β β
β β No visibility into processing β
β β Can't debug failures β
β β No cost tracking β
β β No performance metrics β
β β Compliance issues β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ ADD CALLBACKS
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MONITORED SYSTEM β
β (Full observability) β
β β
β User Input β [π Monitor] β [π Track] β [π Log] β Response β
β β
β β
Real-time visibility β
β β
Detailed error tracking β
β β
Cost monitoring β
β β
Performance analytics β
β β
Audit trails β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββποΈ Core Callback System β
π Basic Callback Implementation β
python
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from langchain_core.messages import BaseMessage
from langchain_core.documents import Document
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import json
import time
import logging
# Basic logging callback
class LoggingCallbackHandler(BaseCallbackHandler):
"""Basic callback that logs all LangChain events"""
def __init__(self, log_level: str = "INFO"):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(getattr(logging, log_level))
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any
) -> None:
"""Called when LLM starts processing"""
self.logger.info(f"LLM Starting: {serialized.get('name', 'Unknown')}")
self.logger.debug(f"Prompts: {prompts}")
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Called when LLM finishes processing"""
self.logger.info("LLM Completed")
if response.llm_output:
token_usage = response.llm_output.get('token_usage', {})
if token_usage:
self.logger.info(f"Token usage: {token_usage}")
def on_llm_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any
) -> None:
"""Called when LLM encounters an error"""
self.logger.error(f"LLM Error: {str(error)}")
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs: Any
) -> None:
"""Called when chain starts"""
chain_name = serialized.get('name', 'Unknown Chain')
self.logger.info(f"Chain Starting: {chain_name}")
self.logger.debug(f"Chain inputs: {inputs}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""Called when chain ends"""
self.logger.info("Chain Completed")
self.logger.debug(f"Chain outputs: {outputs}")
def on_chain_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any
) -> None:
"""Called when chain encounters an error"""
self.logger.error(f"Chain Error: {str(error)}")
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs: Any
) -> None:
"""Called when tool starts"""
tool_name = serialized.get('name', 'Unknown Tool')
self.logger.info(f"Tool Starting: {tool_name}")
self.logger.debug(f"Tool input: {input_str}")
def on_tool_end(self, output: str, **kwargs: Any) -> None:
"""Called when tool ends"""
self.logger.info("Tool Completed")
self.logger.debug(f"Tool output: {output[:100]}...")
def on_tool_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any
) -> None:
"""Called when tool encounters an error"""
self.logger.error(f"Tool Error: {str(error)}")
def on_text(self, text: str, **kwargs: Any) -> None:
"""Called when arbitrary text is processed"""
self.logger.debug(f"Text processed: {text[:50]}...")
# Performance monitoring callback
class PerformanceCallbackHandler(BaseCallbackHandler):
"""Callback that tracks performance metrics"""
def __init__(self):
self.metrics = {
'llm_calls': 0,
'chain_calls': 0,
'tool_calls': 0,
'total_tokens': 0,
'total_cost': 0.0,
'execution_times': [],
'errors': []
}
self.start_times = {}
# Token costs (approximate)
self.token_costs = {
'gpt-3.5-turbo': {'input': 0.001, 'output': 0.002}, # per 1K tokens
'gpt-4': {'input': 0.03, 'output': 0.06},
'text-davinci-003': {'input': 0.02, 'output': 0.02}
}
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any
) -> None:
"""Track LLM start"""
run_id = kwargs.get('run_id', 'unknown')
self.start_times[f"llm_{run_id}"] = time.time()
self.metrics['llm_calls'] += 1
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Track LLM completion and costs"""
run_id = kwargs.get('run_id', 'unknown')
start_time = self.start_times.pop(f"llm_{run_id}", time.time())
execution_time = time.time() - start_time
self.metrics['execution_times'].append({
'type': 'llm',
'duration': execution_time,
'timestamp': datetime.now()
})
# Track token usage and costs
if response.llm_output:
token_usage = response.llm_output.get('token_usage', {})
model_name = response.llm_output.get('model_name', 'unknown')
if token_usage:
prompt_tokens = token_usage.get('prompt_tokens', 0)
completion_tokens = token_usage.get('completion_tokens', 0)
total_tokens = token_usage.get('total_tokens', 0)
self.metrics['total_tokens'] += total_tokens
# Calculate cost
if model_name in self.token_costs:
costs = self.token_costs[model_name]
cost = (
(prompt_tokens / 1000) * costs['input'] +
(completion_tokens / 1000) * costs['output']
)
self.metrics['total_cost'] += cost
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs: Any
) -> None:
"""Track chain start"""
run_id = kwargs.get('run_id', 'unknown')
self.start_times[f"chain_{run_id}"] = time.time()
self.metrics['chain_calls'] += 1
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""Track chain completion"""
run_id = kwargs.get('run_id', 'unknown')
start_time = self.start_times.pop(f"chain_{run_id}", time.time())
execution_time = time.time() - start_time
self.metrics['execution_times'].append({
'type': 'chain',
'duration': execution_time,
'timestamp': datetime.now()
})
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs: Any
) -> None:
"""Track tool start"""
run_id = kwargs.get('run_id', 'unknown')
self.start_times[f"tool_{run_id}"] = time.time()
self.metrics['tool_calls'] += 1
def on_tool_end(self, output: str, **kwargs: Any) -> None:
"""Track tool completion"""
run_id = kwargs.get('run_id', 'unknown')
start_time = self.start_times.pop(f"tool_{run_id}", time.time())
execution_time = time.time() - start_time
self.metrics['execution_times'].append({
'type': 'tool',
'duration': execution_time,
'timestamp': datetime.now()
})
def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
"""Track LLM errors"""
self.metrics['errors'].append({
'type': 'llm',
'error': str(error),
'timestamp': datetime.now()
})
def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
"""Track chain errors"""
self.metrics['errors'].append({
'type': 'chain',
'error': str(error),
'timestamp': datetime.now()
})
def on_tool_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
"""Track tool errors"""
self.metrics['errors'].append({
'type': 'tool',
'error': str(error),
'timestamp': datetime.now()
})
def get_metrics_summary(self) -> Dict[str, Any]:
"""Get comprehensive metrics summary"""
execution_times = [m['duration'] for m in self.metrics['execution_times']]
return {
'calls': {
'llm_calls': self.metrics['llm_calls'],
'chain_calls': self.metrics['chain_calls'],
'tool_calls': self.metrics['tool_calls']
},
'performance': {
'total_execution_time': sum(execution_times),
'average_execution_time': sum(execution_times) / len(execution_times) if execution_times else 0,
'fastest_execution': min(execution_times) if execution_times else 0,
'slowest_execution': max(execution_times) if execution_times else 0
},
'usage': {
'total_tokens': self.metrics['total_tokens'],
'total_cost_usd': round(self.metrics['total_cost'], 4)
},
'errors': {
'total_errors': len(self.metrics['errors']),
'error_types': [e['type'] for e in self.metrics['errors']]
}
}
# Test basic callbacks
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
def demo_basic_callbacks():
"""Demonstrate basic callback functionality"""
# Create callbacks
logging_callback = LoggingCallbackHandler(log_level="INFO")
performance_callback = PerformanceCallbackHandler()
# Setup logging
logging.basicConfig(level=logging.INFO)
# Create LLM with callbacks
llm = ChatOpenAI(
temperature=0.7,
callbacks=[logging_callback, performance_callback]
)
# Create simple chain
prompt = PromptTemplate(
input_variables=["topic"],
template="Explain {topic} in simple terms."
)
chain = LLMChain(
llm=llm,
prompt=prompt,
callbacks=[logging_callback, performance_callback]
)
print("π Basic Callbacks Demo:")
print("========================")
# Test multiple calls
topics = ["machine learning", "quantum computing", "blockchain"]
for topic in topics:
print(f"\nProcessing topic: {topic}")
try:
result = chain.run(topic=topic)
print(f"Result: {result[:100]}...")
except Exception as e:
print(f"Error: {e}")
# Show performance metrics
metrics = performance_callback.get_metrics_summary()
print(f"\nπ Performance Metrics:")
print(json.dumps(metrics, indent=2, default=str))
return performance_callback
performance_callback = demo_basic_callbacks()π Conversation Tracking Callback β
python
class ConversationTrackingCallback(BaseCallbackHandler):
"""Callback that tracks full conversation flows"""
def __init__(self, session_id: str = None):
self.session_id = session_id or f"session_{int(time.time())}"
self.conversations = []
self.current_conversation = None
self.message_count = 0
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs: Any
) -> None:
"""Start tracking a conversation"""
self.current_conversation = {
'session_id': self.session_id,
'conversation_id': f"conv_{int(time.time())}_{self.message_count}",
'start_time': datetime.now(),
'inputs': inputs,
'chain_name': serialized.get('name', 'Unknown'),
'steps': [],
'outputs': None,
'error': None,
'metadata': kwargs
}
self.message_count += 1
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any
) -> None:
"""Track LLM interaction within conversation"""
if self.current_conversation:
self.current_conversation['steps'].append({
'type': 'llm_start',
'timestamp': datetime.now(),
'model': serialized.get('name', 'Unknown'),
'prompts': prompts,
'metadata': kwargs
})
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Track LLM response"""
if self.current_conversation:
self.current_conversation['steps'].append({
'type': 'llm_end',
'timestamp': datetime.now(),
'response': response.generations[0][0].text if response.generations else '',
'token_usage': response.llm_output.get('token_usage', {}) if response.llm_output else {},
'metadata': kwargs
})
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs: Any
) -> None:
"""Track tool usage"""
if self.current_conversation:
self.current_conversation['steps'].append({
'type': 'tool_start',
'timestamp': datetime.now(),
'tool_name': serialized.get('name', 'Unknown'),
'input': input_str,
'metadata': kwargs
})
def on_tool_end(self, output: str, **kwargs: Any) -> None:
"""Track tool output"""
if self.current_conversation:
self.current_conversation['steps'].append({
'type': 'tool_end',
'timestamp': datetime.now(),
'output': output,
'metadata': kwargs
})
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""Complete conversation tracking"""
if self.current_conversation:
self.current_conversation['end_time'] = datetime.now()
self.current_conversation['outputs'] = outputs
self.current_conversation['duration'] = (
self.current_conversation['end_time'] -
self.current_conversation['start_time']
).total_seconds()
self.conversations.append(self.current_conversation)
self.current_conversation = None
def on_chain_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any
) -> None:
"""Track conversation errors"""
if self.current_conversation:
self.current_conversation['end_time'] = datetime.now()
self.current_conversation['error'] = str(error)
self.current_conversation['duration'] = (
self.current_conversation['end_time'] -
self.current_conversation['start_time']
).total_seconds()
self.conversations.append(self.current_conversation)
self.current_conversation = None
def get_conversation_history(self) -> List[Dict[str, Any]]:
"""Get full conversation history"""
return self.conversations
def get_session_summary(self) -> Dict[str, Any]:
"""Get session summary statistics"""
if not self.conversations:
return {"message": "No conversations recorded"}
total_duration = sum(c['duration'] for c in self.conversations)
successful_conversations = [c for c in self.conversations if not c['error']]
return {
'session_id': self.session_id,
'total_conversations': len(self.conversations),
'successful_conversations': len(successful_conversations),
'total_duration': total_duration,
'average_duration': total_duration / len(self.conversations),
'error_rate': (len(self.conversations) - len(successful_conversations)) / len(self.conversations),
'tools_used': self._get_tools_used(),
'models_used': self._get_models_used()
}
def _get_tools_used(self) -> List[str]:
"""Extract unique tools used in session"""
tools = set()
for conv in self.conversations:
for step in conv['steps']:
if step['type'] == 'tool_start':
tools.add(step['tool_name'])
return list(tools)
def _get_models_used(self) -> List[str]:
"""Extract unique models used in session"""
models = set()
for conv in self.conversations:
for step in conv['steps']:
if step['type'] == 'llm_start':
models.add(step['model'])
return list(models)
# Demo conversation tracking
def demo_conversation_tracking():
"""Demonstrate conversation tracking capabilities"""
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
# Create tracking callback
conversation_tracker = ConversationTrackingCallback("demo_session_001")
# Simple tools for demo
def calculator(expression: str) -> str:
"""Simple calculator tool"""
try:
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
def weather(city: str) -> str:
"""Mock weather tool"""
return f"Weather in {city}: 72Β°F, sunny"
tools = [
Tool(name="calculator", description="Perform calculations", func=calculator),
Tool(name="weather", description="Get weather info", func=weather)
]
# Create agent with tracking
llm = ChatOpenAI(temperature=0, callbacks=[conversation_tracker])
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant with access to tools."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
agent = create_openai_functions_agent(llm=llm, tools=tools, prompt=prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
callbacks=[conversation_tracker]
)
print("\nπ Conversation Tracking Demo:")
print("=============================")
# Test conversations
test_inputs = [
"What's 25 * 17?",
"What's the weather like in New York?",
"Calculate 100 / 4 and tell me about the weather in London"
]
for i, user_input in enumerate(test_inputs, 1):
print(f"\nConversation {i}: {user_input}")
try:
result = agent_executor.invoke({"input": user_input})
print(f"Response: {result['output'][:100]}...")
except Exception as e:
print(f"Error: {e}")
# Show conversation history
print("\nπ Conversation History:")
history = conversation_tracker.get_conversation_history()
for i, conv in enumerate(history, 1):
print(f"\nConversation {i}:")
print(f" Duration: {conv['duration']:.2f}s")
print(f" Steps: {len(conv['steps'])}")
if conv['error']:
print(f" Error: {conv['error']}")
else:
print(f" Success: {bool(conv['outputs'])}")
# Show session summary
summary = conversation_tracker.get_session_summary()
print(f"\nπ Session Summary:")
print(json.dumps(summary, indent=2, default=str))
return conversation_tracker
conversation_tracker = demo_conversation_tracking()π Advanced Monitoring Patterns β
π― Real-time Dashboard Callback β
python
import threading
import queue
from datetime import datetime, timedelta
from typing import Dict, Any, List
class DashboardCallback(BaseCallbackHandler):
"""Callback that feeds real-time data to a monitoring dashboard"""
def __init__(self, update_interval: int = 5):
self.update_interval = update_interval
self.metrics_queue = queue.Queue()
self.dashboard_data = {
'current_status': 'idle',
'active_chains': 0,
'recent_activity': [],
'performance_metrics': {
'avg_response_time': 0,
'requests_per_minute': 0,
'error_rate': 0,
'token_usage_rate': 0
},
'real_time_events': []
}
# Start background processor
self.processor_thread = threading.Thread(target=self._process_metrics, daemon=True)
self.processor_thread.start()
# Track active operations
self.active_operations = {}
self.recent_metrics = []
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs: Any
) -> None:
"""Track chain start for real-time monitoring"""
run_id = kwargs.get('run_id', 'unknown')
event = {
'type': 'chain_start',
'timestamp': datetime.now(),
'run_id': run_id,
'chain_name': serialized.get('name', 'Unknown'),
'inputs': inputs
}
self.active_operations[run_id] = {
'start_time': datetime.now(),
'type': 'chain',
'name': serialized.get('name', 'Unknown')
}
self.dashboard_data['active_chains'] += 1
self.dashboard_data['current_status'] = 'processing'
self._add_real_time_event(event)
self.metrics_queue.put(event)
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""Track chain completion"""
run_id = kwargs.get('run_id', 'unknown')
if run_id in self.active_operations:
operation = self.active_operations.pop(run_id)
duration = (datetime.now() - operation['start_time']).total_seconds()
event = {
'type': 'chain_end',
'timestamp': datetime.now(),
'run_id': run_id,
'duration': duration,
'success': True
}
self.dashboard_data['active_chains'] = max(0, self.dashboard_data['active_chains'] - 1)
if self.dashboard_data['active_chains'] == 0:
self.dashboard_data['current_status'] = 'idle'
self._add_real_time_event(event)
self._update_performance_metrics(duration, success=True)
self.metrics_queue.put(event)
def on_chain_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any
) -> None:
"""Track chain errors"""
run_id = kwargs.get('run_id', 'unknown')
if run_id in self.active_operations:
operation = self.active_operations.pop(run_id)
duration = (datetime.now() - operation['start_time']).total_seconds()
event = {
'type': 'chain_error',
'timestamp': datetime.now(),
'run_id': run_id,
'duration': duration,
'error': str(error),
'success': False
}
self.dashboard_data['active_chains'] = max(0, self.dashboard_data['active_chains'] - 1)
if self.dashboard_data['active_chains'] == 0:
self.dashboard_data['current_status'] = 'idle'
self._add_real_time_event(event)
self._update_performance_metrics(duration, success=False)
self.metrics_queue.put(event)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Track token usage"""
if response.llm_output:
token_usage = response.llm_output.get('token_usage', {})
if token_usage:
event = {
'type': 'token_usage',
'timestamp': datetime.now(),
'tokens': token_usage.get('total_tokens', 0),
'prompt_tokens': token_usage.get('prompt_tokens', 0),
'completion_tokens': token_usage.get('completion_tokens', 0)
}
self._add_real_time_event(event)
self.metrics_queue.put(event)
def _add_real_time_event(self, event: Dict[str, Any]):
"""Add event to real-time feed"""
self.dashboard_data['real_time_events'].append(event)
# Keep only last 50 events
if len(self.dashboard_data['real_time_events']) > 50:
self.dashboard_data['real_time_events'] = self.dashboard_data['real_time_events'][-50:]
# Update recent activity
activity_summary = f"{event['type']} at {event['timestamp'].strftime('%H:%M:%S')}"
self.dashboard_data['recent_activity'].append(activity_summary)
if len(self.dashboard_data['recent_activity']) > 10:
self.dashboard_data['recent_activity'] = self.dashboard_data['recent_activity'][-10:]
def _update_performance_metrics(self, duration: float, success: bool):
"""Update performance metrics"""
self.recent_metrics.append({
'duration': duration,
'success': success,
'timestamp': datetime.now()
})
# Keep metrics for last 10 minutes
cutoff_time = datetime.now() - timedelta(minutes=10)
self.recent_metrics = [
m for m in self.recent_metrics
if m['timestamp'] > cutoff_time
]
if self.recent_metrics:
# Calculate metrics
durations = [m['duration'] for m in self.recent_metrics]
successes = [m['success'] for m in self.recent_metrics]
self.dashboard_data['performance_metrics']['avg_response_time'] = sum(durations) / len(durations)
self.dashboard_data['performance_metrics']['requests_per_minute'] = len(self.recent_metrics) / 10 # 10 minute window
self.dashboard_data['performance_metrics']['error_rate'] = 1 - (sum(successes) / len(successes))
def _process_metrics(self):
"""Background thread to process metrics"""
while True:
try:
# Process queued events
while not self.metrics_queue.empty():
event = self.metrics_queue.get_nowait()
# Here you could send to external monitoring systems
# e.g., Prometheus, DataDog, CloudWatch, etc.
time.sleep(self.update_interval)
except Exception as e:
print(f"Metrics processing error: {e}")
def get_dashboard_data(self) -> Dict[str, Any]:
"""Get current dashboard data"""
return self.dashboard_data.copy()
def print_dashboard(self):
"""Print simple text dashboard"""
data = self.get_dashboard_data()
print("\n" + "="*60)
print("π₯οΈ LANGCHAIN MONITORING DASHBOARD")
print("="*60)
print(f"Status: {data['current_status'].upper()}")
print(f"Active Chains: {data['active_chains']}")
print("\nπ Performance Metrics:")
metrics = data['performance_metrics']
print(f" Avg Response Time: {metrics['avg_response_time']:.2f}s")
print(f" Requests/Minute: {metrics['requests_per_minute']:.1f}")
print(f" Error Rate: {metrics['error_rate']:.1%}")
print("\nπ Recent Activity:")
for activity in data['recent_activity'][-5:]:
print(f" β’ {activity}")
print("\nβ‘ Real-time Events (Last 5):")
for event in data['real_time_events'][-5:]:
timestamp = event['timestamp'].strftime('%H:%M:%S')
event_type = event['type']
print(f" [{timestamp}] {event_type}")
print("="*60)
# Demo real-time dashboard
def demo_dashboard_monitoring():
"""Demonstrate real-time dashboard monitoring"""
dashboard_callback = DashboardCallback(update_interval=2)
# Create simple chain with dashboard monitoring
llm = ChatOpenAI(temperature=0.7, callbacks=[dashboard_callback])
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["question"],
template="Answer this question briefly: {question}"
)
chain = LLMChain(
llm=llm,
prompt=prompt,
callbacks=[dashboard_callback]
)
print("\nπ₯οΈ Real-time Dashboard Monitoring Demo:")
print("======================================")
# Simulate various workloads
questions = [
"What is artificial intelligence?",
"How does machine learning work?",
"Explain quantum computing",
"What are neural networks?",
"How does blockchain technology work?"
]
# Process questions with delays to show real-time updates
for i, question in enumerate(questions, 1):
print(f"\nProcessing question {i}...")
try:
result = chain.run(question=question)
print(f"β
Completed: {result[:50]}...")
except Exception as e:
print(f"β Error: {e}")
# Show dashboard every few requests
if i % 2 == 0:
dashboard_callback.print_dashboard()
time.sleep(1) # Small delay between requests
# Final dashboard view
print("\nπ Final Dashboard State:")
dashboard_callback.print_dashboard()
return dashboard_callback
dashboard_callback = demo_dashboard_monitoring()ποΈ Production Monitoring System β
python
import sqlite3
import json
from pathlib import Path
from typing import Dict, Any, List, Optional
class ProductionMonitoringSystem:
"""Comprehensive production monitoring system"""
def __init__(self, db_path: str = "langchain_monitoring.db", retention_days: int = 30):
self.db_path = db_path
self.retention_days = retention_days
self._init_database()
# Create composite callback
self.callback = self._create_monitoring_callback()
def _init_database(self):
"""Initialize monitoring database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Execution logs table
cursor.execute("""
CREATE TABLE IF NOT EXISTS execution_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
session_id TEXT,
run_id TEXT,
event_type TEXT NOT NULL,
component_type TEXT,
component_name TEXT,
duration_ms INTEGER,
success BOOLEAN,
error_message TEXT,
input_data TEXT,
output_data TEXT,
metadata TEXT
)
""")
# Performance metrics table
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
metric_name TEXT NOT NULL,
metric_value REAL NOT NULL,
tags TEXT
)
""")
# Token usage table
cursor.execute("""
CREATE TABLE IF NOT EXISTS token_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
model_name TEXT,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
estimated_cost REAL
)
""")
# Alerts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
alert_type TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
context TEXT,
acknowledged BOOLEAN DEFAULT FALSE
)
""")
conn.commit()
conn.close()
def _create_monitoring_callback(self):
"""Create comprehensive monitoring callback"""
class ProductionCallback(BaseCallbackHandler):
def __init__(self, parent):
self.parent = parent
self.session_id = f"session_{int(time.time())}"
self.active_runs = {}
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
run_id = kwargs.get('run_id', 'unknown')
self.active_runs[run_id] = {
'start_time': datetime.now(),
'type': 'chain',
'name': serialized.get('name', 'Unknown')
}
self.parent._log_execution(
event_type='chain_start',
component_type='chain',
component_name=serialized.get('name', 'Unknown'),
run_id=run_id,
input_data=json.dumps(inputs),
session_id=self.session_id
)
def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
run_id = kwargs.get('run_id', 'unknown')
if run_id in self.active_runs:
run_info = self.active_runs.pop(run_id)
duration = (datetime.now() - run_info['start_time']).total_seconds() * 1000
self.parent._log_execution(
event_type='chain_end',
component_type='chain',
component_name=run_info['name'],
run_id=run_id,
duration_ms=duration,
success=True,
output_data=json.dumps(outputs),
session_id=self.session_id
)
# Check for performance alerts
self.parent._check_performance_alerts(duration, 'chain')
def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs):
run_id = kwargs.get('run_id', 'unknown')
if run_id in self.active_runs:
run_info = self.active_runs.pop(run_id)
duration = (datetime.now() - run_info['start_time']).total_seconds() * 1000
self.parent._log_execution(
event_type='chain_error',
component_type='chain',
component_name=run_info['name'],
run_id=run_id,
duration_ms=duration,
success=False,
error_message=str(error),
session_id=self.session_id
)
# Create alert for error
self.parent._create_alert(
alert_type='error',
severity='high',
message=f"Chain error: {str(error)}",
context=json.dumps({
'chain_name': run_info['name'],
'run_id': run_id,
'duration_ms': duration
})
)
def on_llm_end(self, response: LLMResult, **kwargs):
if response.llm_output:
token_usage = response.llm_output.get('token_usage', {})
model_name = response.llm_output.get('model_name', 'unknown')
if token_usage:
self.parent._log_token_usage(
model_name=model_name,
prompt_tokens=token_usage.get('prompt_tokens', 0),
completion_tokens=token_usage.get('completion_tokens', 0),
total_tokens=token_usage.get('total_tokens', 0)
)
return ProductionCallback(self)
def _log_execution(
self,
event_type: str,
component_type: str = None,
component_name: str = None,
run_id: str = None,
duration_ms: float = None,
success: bool = None,
error_message: str = None,
input_data: str = None,
output_data: str = None,
session_id: str = None,
metadata: Dict[str, Any] = None
):
"""Log execution event to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO execution_logs
(timestamp, session_id, run_id, event_type, component_type,
component_name, duration_ms, success, error_message,
input_data, output_data, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datetime.now(),
session_id,
run_id,
event_type,
component_type,
component_name,
duration_ms,
success,
error_message,
input_data,
output_data,
json.dumps(metadata) if metadata else None
))
conn.commit()
conn.close()
def _log_token_usage(
self,
model_name: str,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int
):
"""Log token usage and estimated costs"""
# Estimate cost (simplified)
cost_per_1k_tokens = {
'gpt-3.5-turbo': 0.002,
'gpt-4': 0.06,
'text-davinci-003': 0.02
}
estimated_cost = (total_tokens / 1000) * cost_per_1k_tokens.get(model_name, 0.01)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO token_usage
(timestamp, model_name, prompt_tokens, completion_tokens,
total_tokens, estimated_cost)
VALUES (?, ?, ?, ?, ?, ?)
""", (
datetime.now(),
model_name,
prompt_tokens,
completion_tokens,
total_tokens,
estimated_cost
))
conn.commit()
conn.close()
def _create_alert(
self,
alert_type: str,
severity: str,
message: str,
context: str = None
):
"""Create monitoring alert"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO alerts (timestamp, alert_type, severity, message, context)
VALUES (?, ?, ?, ?, ?)
""", (datetime.now(), alert_type, severity, message, context))
conn.commit()
conn.close()
# Print alert for demo
print(f"π¨ ALERT [{severity.upper()}]: {message}")
def _check_performance_alerts(self, duration_ms: float, component_type: str):
"""Check for performance issues and create alerts"""
thresholds = {
'chain': 10000, # 10 seconds
'llm': 30000, # 30 seconds
'tool': 5000 # 5 seconds
}
threshold = thresholds.get(component_type, 15000)
if duration_ms > threshold:
self._create_alert(
alert_type='performance',
severity='medium',
message=f"Slow {component_type} execution: {duration_ms:.0f}ms",
context=json.dumps({
'duration_ms': duration_ms,
'threshold_ms': threshold,
'component_type': component_type
})
)
def get_performance_report(self, hours: int = 24) -> Dict[str, Any]:
"""Generate performance report for last N hours"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cutoff_time = datetime.now() - timedelta(hours=hours)
# Get execution stats
cursor.execute("""
SELECT
component_type,
COUNT(*) as total_executions,
AVG(duration_ms) as avg_duration,
MIN(duration_ms) as min_duration,
MAX(duration_ms) as max_duration,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successful_executions
FROM execution_logs
WHERE timestamp > ? AND duration_ms IS NOT NULL
GROUP BY component_type
""", (cutoff_time,))
execution_stats = cursor.fetchall()
# Get token usage
cursor.execute("""
SELECT
model_name,
SUM(total_tokens) as total_tokens,
SUM(estimated_cost) as total_cost,
COUNT(*) as api_calls
FROM token_usage
WHERE timestamp > ?
GROUP BY model_name
""", (cutoff_time,))
token_stats = cursor.fetchall()
# Get alerts
cursor.execute("""
SELECT alert_type, severity, COUNT(*) as count
FROM alerts
WHERE timestamp > ?
GROUP BY alert_type, severity
""", (cutoff_time,))
alert_stats = cursor.fetchall()
conn.close()
return {
'report_period': f"Last {hours} hours",
'execution_stats': [
{
'component_type': row[0],
'total_executions': row[1],
'avg_duration_ms': row[2],
'min_duration_ms': row[3],
'max_duration_ms': row[4],
'success_rate': row[5] / row[1] if row[1] > 0 else 0
}
for row in execution_stats
],
'token_usage': [
{
'model': row[0],
'total_tokens': row[1],
'total_cost_usd': row[2],
'api_calls': row[3]
}
for row in token_stats
],
'alerts': [
{
'type': row[0],
'severity': row[1],
'count': row[2]
}
for row in alert_stats
]
}
def cleanup_old_data(self):
"""Clean up old monitoring data"""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
tables = ['execution_logs', 'performance_metrics', 'token_usage', 'alerts']
for table in tables:
cursor.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff_date,))
conn.commit()
conn.close()
# Demo production monitoring
def demo_production_monitoring():
"""Demonstrate production monitoring system"""
# Create monitoring system
monitoring = ProductionMonitoringSystem(db_path="demo_monitoring.db")
# Create chain with monitoring
llm = ChatOpenAI(temperature=0.7, callbacks=[monitoring.callback])
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["topic"],
template="Explain {topic} in one paragraph."
)
chain = LLMChain(
llm=llm,
prompt=prompt,
callbacks=[monitoring.callback]
)
print("\nποΈ Production Monitoring Demo:")
print("==============================")
# Test various scenarios
test_cases = [
"machine learning",
"quantum computing",
"blockchain technology",
"artificial intelligence",
"data science"
]
for topic in test_cases:
print(f"\nProcessing: {topic}")
try:
result = chain.run(topic=topic)
print(f"β
Success: {len(result)} characters")
except Exception as e:
print(f"β Error: {e}")
time.sleep(0.5)
# Generate performance report
print("\nπ Performance Report:")
print("======================")
report = monitoring.get_performance_report(hours=1)
print(json.dumps(report, indent=2, default=str))
return monitoring
production_monitoring = demo_production_monitoring()π Next Steps β
Ready to build advanced AI systems? Continue with:
- Advanced RAG - Sophisticated retrieval patterns with monitoring
- Production Patterns - Deploy monitored systems at scale
- Security and Privacy - Secure your monitored applications
Key Monitoring Takeaways:
- Callbacks provide visibility into every aspect of LangChain execution
- Performance tracking is essential for production systems
- Real-time monitoring enables proactive issue detection
- Comprehensive logging supports debugging and analysis
- Alert systems notify you of issues before users do
- Cost tracking prevents unexpected billing surprises
- Dashboard views provide operational insights at a glance
- Historical analysis helps optimize system performance over time