LCEL Advanced Patterns - Mastering Chain Composition β
Deep dive into advanced LangChain Expression Language patterns for production-ready AI applications
π Advanced LCEL Topics Overview β
This guide covers the advanced patterns and techniques you need to build sophisticated, production-ready LangChain applications using LCEL.
Prerequisites: Basic understanding of LCEL fundamentals from LCEL Introduction
π― ADVANCED LCEL MASTERY ROADMAP π―
(From Intermediate to Expert)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π BATCH PROCESSING β
β β’ Process multiple inputs efficiently β
β β’ Optimize throughput and performance β
β β’ Handle large datasets with LCEL β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β β‘ STREAMING PATTERNS β
β β’ Real-time response delivery β
β β’ Progressive result updates β
β β’ Enhanced user experience β
ββββββββββββββββββββββ¬ββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π RUNNABLE PASSTHROUGH β
β β’ Data flow control patterns β
β β’ Complex pipeline orchestration β
β β’ Input transformation strategies β
ββββββββββββββββββββββ¬ββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π¨ GRAPHING & VISUALIZATION β
β β’ Chain visualization techniques β
β β’ Debug complex workflows β
β β’ Architecture documentation β
ββββββββββββββββββββββ¬ββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π ADVANCED PARALLEL β
β β’ Complex parallel patterns β
β β’ Dynamic branching strategies β
β β’ Performance optimization β
βββββββββββββββββββββββββββββββββββββββββββπ Batch Processing - Efficient Multi-Input Handling β
Batch processing allows you to process multiple inputs efficiently, reducing overhead and improving throughput for production applications.
π― Understanding Batch Processing β
π BATCH PROCESSING PIPELINE π
(Multiple inputs, optimized processing)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π₯ INPUT BATCH β
β ["Question 1", "Question 2", "Question 3", "Question 4"] β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β β‘ BATCH OPTIMIZATION β
β β
β π« Sequential Processing (Slow): β
β Q1 β Process β Result β
β Q2 β Process β Result β
β Q3 β Process β Result β
β Q4 β Process β Result β
β β
β β
Batch Processing (Fast): β
β [Q1,Q2,Q3,Q4] β Process β [R1,R2,R3,R4] β
ββββββββββββββββββββββ¬ββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π€ BATCH RESULTS β
β ["Answer 1", "Answer 2", "Answer 3", β
β "Answer 4"] β
βββββββββββββββββββββββββββββββββββββββββββπ Basic Batch Processing Example β
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Set up the chain
prompt = ChatPromptTemplate.from_template("Explain {topic} in one sentence.")
llm = AzureChatOpenAI(temperature=0)
parser = StrOutputParser()
chain = prompt | llm | parser
# Single input (normal way)
single_result = chain.invoke({"topic": "machine learning"})
print("Single:", single_result)
# Multiple inputs (batch way)
batch_inputs = [
{"topic": "machine learning"},
{"topic": "artificial intelligence"},
{"topic": "neural networks"},
{"topic": "deep learning"}
]
batch_results = chain.batch(batch_inputs)
for i, result in enumerate(batch_results):
print(f"Batch {i+1}: {result}")π Advanced Batch Patterns β
Pattern 1: Batch with Configuration β
# Configure batch processing parameters
batch_config = {
"max_concurrency": 3, # Process 3 items at once
"return_exceptions": True # Don't stop on errors
}
try:
batch_results = chain.batch(
batch_inputs,
config=batch_config
)
for i, result in enumerate(batch_results):
if isinstance(result, Exception):
print(f"Error in batch {i+1}: {result}")
else:
print(f"Success {i+1}: {result}")
except Exception as e:
print(f"Batch processing failed: {e}")Pattern 2: Async Batch Processing β
import asyncio
async def async_batch_processing():
# Async batch for even better performance
async_results = await chain.abatch(batch_inputs)
print("π Async Batch Results:")
for i, result in enumerate(async_results):
print(f" {i+1}. {result}")
# Run async batch
asyncio.run(async_batch_processing())Pattern 3: Batch with Progress Tracking β
from tqdm import tqdm
import time
def batch_with_progress(chain, inputs, batch_size=5):
"""Process inputs in batches with progress tracking"""
results = []
for i in tqdm(range(0, len(inputs), batch_size), desc="Processing batches"):
batch = inputs[i:i + batch_size]
batch_results = chain.batch(batch)
results.extend(batch_results)
time.sleep(0.1) # Small delay to avoid rate limits
return results
# Example: Process 50 questions in batches of 5
large_input_set = [{"topic": f"Technology topic {i}"} for i in range(50)]
results = batch_with_progress(chain, large_input_set)β‘ Streaming Patterns - Real-time Response Delivery β
Streaming allows you to get partial results as they're generated, creating a better user experience for long-running operations.
π― Understanding Streaming β
β‘ STREAMING RESPONSE PATTERN β‘
(Progressive result delivery)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π« WITHOUT STREAMING β
β β
β User: "Write a long essay about AI" β
β System: [........................] (waiting 30 seconds) β
β System: "Here's a 1000-word essay..." (all at once) β
β β
β π Poor user experience - long wait, no feedback β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β β
WITH STREAMING β
β β
β User: "Write a long essay about AI" β
β System: "Artificial intelligence is..." β
β System: "a rapidly evolving field..." β
β System: "that encompasses machine..." β
β System: "learning, deep learning..." β
β β
β π Great UX - immediate feedback β
βββββββββββββββββββββββββββββββββββββββββββπ Basic Streaming Example β
# Basic streaming
def stream_response(chain, input_data):
print("π― Streaming Response:")
for chunk in chain.stream(input_data):
print(chunk, end="", flush=True)
print("\nβ
Streaming complete!")
# Test streaming
stream_response(chain, {"topic": "quantum computing in detail"})π Advanced Streaming Patterns β
Pattern 1: Streaming with Token Counting β
def stream_with_metrics(chain, input_data):
"""Stream response while tracking metrics"""
token_count = 0
start_time = time.time()
response_parts = []
print("π Streaming with metrics:")
print("-" * 40)
for chunk in chain.stream(input_data):
response_parts.append(chunk)
token_count += len(chunk.split())
# Print chunk with metrics
print(f"[{token_count:3d} tokens] {chunk}", end="", flush=True)
end_time = time.time()
full_response = "".join(response_parts)
print(f"\n\nπ Metrics:")
print(f" Total tokens: {token_count}")
print(f" Time taken: {end_time - start_time:.2f} seconds")
print(f" Tokens/second: {token_count / (end_time - start_time):.1f}")
return full_responsePattern 2: Streaming with Real-time Processing β
def stream_and_process(chain, input_data, process_func=None):
"""Stream response and process chunks in real-time"""
processed_chunks = []
print("π Streaming with real-time processing:")
for chunk in chain.stream(input_data):
# Process each chunk as it arrives
if process_func:
processed_chunk = process_func(chunk)
processed_chunks.append(processed_chunk)
print(chunk, end="", flush=True)
print(f"\nβ
Processed {len(processed_chunks)} chunks")
return processed_chunks
# Example processor function
def analyze_sentiment(text):
# Simple sentiment analysis (you could use a real sentiment analyzer)
positive_words = ["good", "great", "excellent", "amazing", "wonderful"]
return "positive" if any(word in text.lower() for word in positive_words) else "neutral"
# Use streaming with processing
results = stream_and_process(
chain,
{"topic": "the amazing benefits of artificial intelligence"},
analyze_sentiment
)Pattern 3: Multi-Chain Streaming β
from langchain_core.runnables import RunnableParallel
# Create multiple chains for parallel streaming
summary_chain = ChatPromptTemplate.from_template(
"Summarize {topic} in 2 sentences"
) | llm | parser
detailed_chain = ChatPromptTemplate.from_template(
"Explain {topic} in detail with examples"
) | llm | parser
# Parallel streaming
def parallel_stream(topic):
parallel_chain = RunnableParallel({
"summary": summary_chain,
"detailed": detailed_chain
})
print(f"π Parallel streaming for: {topic}")
print("=" * 50)
for chunk in parallel_chain.stream({"topic": topic}):
for key, value in chunk.items():
print(f"[{key.upper()}]: {value}")π RunnablePassthrough - Advanced Data Flow Control β
RunnablePassthrough is a powerful utility for controlling how data flows through your LCEL chains, especially in complex scenarios.
π― Understanding RunnablePassthrough β
π RUNNABLE PASSTHROUGH PATTERNS π
(Data flow control strategies)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π‘ BASIC PASSTHROUGH β
β β
β Input: {"name": "Alice", "age": 30} β
β β β
β βΌ β
β RunnablePassthrough() βββββββββββββββββββββββ β
β β β β
β βΌ βΌ β
β Process name Original data β
β β β β
β βΌ βΌ β
β Output: {"processed": "Hello Alice", "original": {...}} β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π― SELECTIVE PASSTHROUGH β
β β
β RunnablePassthrough.assign( β
β new_field=some_chain, β
β another_field=another_chain β
β ) β
β β
β π Adds new fields while keeping original data β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββπ Basic RunnablePassthrough Examples β
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
# Example 1: Simple passthrough
def simple_passthrough_demo():
# Chain that processes and preserves input
process_name = RunnableLambda(lambda x: f"Hello, {x['name']}!")
chain = RunnableParallel({
"greeting": process_name,
"original": RunnablePassthrough()
})
result = chain.invoke({"name": "Alice", "age": 30})
print("Simple Passthrough:")
print(f" Greeting: {result['greeting']}")
print(f" Original: {result['original']}")
print()
simple_passthrough_demo()π Advanced RunnablePassthrough Patterns β
Pattern 1: RunnablePassthrough.assign() β
def passthrough_assign_demo():
"""Demonstrate RunnablePassthrough.assign() for adding fields"""
# Chains for processing different aspects
sentiment_analyzer = RunnableLambda(
lambda x: "positive" if "good" in x["text"].lower() else "negative"
)
word_counter = RunnableLambda(
lambda x: len(x["text"].split())
)
# Use assign to add new fields while keeping original
enhanced_chain = RunnablePassthrough.assign(
sentiment=sentiment_analyzer,
word_count=word_counter,
processed_at=RunnableLambda(lambda x: "2024-01-01")
)
input_data = {
"text": "This is a good example of text processing",
"user_id": "user123"
}
result = enhanced_chain.invoke(input_data)
print("RunnablePassthrough.assign() Demo:")
for key, value in result.items():
print(f" {key}: {value}")
print()
passthrough_assign_demo()Pattern 2: Conditional Passthrough β
def conditional_passthrough_demo():
"""Pass through data based on conditions"""
def conditional_processor(input_data):
if input_data.get("priority") == "high":
return {
**input_data,
"processing_path": "express",
"estimated_time": "5 minutes"
}
else:
return {
**input_data,
"processing_path": "standard",
"estimated_time": "15 minutes"
}
conditional_chain = RunnableLambda(conditional_processor)
# Test with different priorities
test_cases = [
{"request": "Process my order", "priority": "high"},
{"request": "Update my profile", "priority": "normal"}
]
print("Conditional Passthrough Demo:")
for case in test_cases:
result = conditional_chain.invoke(case)
print(f" Input: {case}")
print(f" Output: {result}")
print()
conditional_passthrough_demo()Pattern 3: Complex Data Transformation β
def complex_transformation_demo():
"""Demonstrate complex data flow with multiple passthroughs"""
# Create a complex chain that processes user data
user_validator = RunnableLambda(
lambda x: {**x, "valid": "@" in x.get("email", "")}
)
profile_enhancer = RunnablePassthrough.assign(
display_name=RunnableLambda(
lambda x: f"{x['first_name']} {x['last_name']}"
),
email_domain=RunnableLambda(
lambda x: x["email"].split("@")[1] if "@" in x.get("email", "") else "unknown"
)
)
final_formatter = RunnableLambda(
lambda x: {
"user_profile": {
"id": x["user_id"],
"name": x["display_name"],
"contact": {
"email": x["email"],
"domain": x["email_domain"]
}
},
"metadata": {
"valid": x["valid"],
"processed": True
}
}
)
# Combine into full processing chain
user_processing_chain = (
user_validator
| profile_enhancer
| final_formatter
)
# Test the complex chain
user_data = {
"user_id": "u123",
"first_name": "John",
"last_name": "Doe",
"email": "john.doe@example.com"
}
result = user_processing_chain.invoke(user_data)
print("Complex Transformation Demo:")
import json
print(json.dumps(result, indent=2))
complex_transformation_demo()π¨ Graphing and Visualizing Runnables β
Understanding complex LCEL chains becomes easier with visualization. Let's explore how to create visual representations of your chains.
π― Chain Visualization Fundamentals β
π¨ CHAIN VISUALIZATION STRATEGIES π¨
(Making complex chains understandable)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π SIMPLE LINEAR CHAIN β
β β
β Input β Prompt β LLM β Parser β Output β
β β
β β
Easy to understand visually β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π COMPLEX PARALLEL CHAIN β
β β
β Input β
β ββββ Branch 1 βββ Result 1 β
β ββββ Branch 2 βββ Result 2 β
β ββββ Branch 3 βββ Result 3 β
β β β
β Combine Results β
β β β
β Output β
β β
β π Needs visualization for clarity β
βββββββββββββββββββββββββββββββββββββββββββπ ASCII Chain Diagrams β
def create_chain_diagram(chain_description):
"""Create ASCII diagram for chain visualization"""
diagrams = {
"simple": """
π Input
β
βΌ
βββββββββββββββ
β Prompt β
βββββββββββββββ
β
βΌ
βββββββββββββββ
β LLM β
βββββββββββββββ
β
βΌ
βββββββββββββββ
β Parser β
βββββββββββββββ
β
βΌ
π€ Output
""",
"parallel": """
π Input
β
βΌ
βββββββββββββββ
β Splitter β
βββββββββββββββ
β
βββββββββββββββ¬ββββββββββββββ
βΌ βΌ βΌ
βββββββββββ βββββββββββ βββββββββββ
β Chain A β β Chain B β β Chain C β
βββββββββββ βββββββββββ βββββββββββ
β β β
βββββββββββββββΌββββββββββββββ
βΌ
βββββββββββββββ
β Combiner β
βββββββββββββββ
β
βΌ
π€ Output
""",
"conditional": """
π Input
β
βΌ
βββββββββββββββ
β Condition? β
βββββββββββββββ
β
ββββ TRUE βββββ¬βββ FALSE βββ
βΌ βΌ βΌ
βββββββββββ βββββββββββ βββββββββββ
β Path A β β Path B β βDefault β
βββββββββββ βββββββββββ βββββββββββ
β β β
βββββββββββββββΌβββββββββββββ
βΌ
π€ Output
"""
}
return diagrams.get(chain_description, "Diagram not found")
# Example usage
print("Simple Chain Diagram:")
print(create_chain_diagram("simple"))
print("\nParallel Chain Diagram:")
print(create_chain_diagram("parallel"))π Interactive Chain Analysis β
def analyze_chain_structure(chain, input_example):
"""Analyze and describe chain structure"""
print("π Chain Structure Analysis")
print("=" * 40)
# Basic chain info
print(f"Chain type: {type(chain).__name__}")
# Test with sample input
try:
print(f"\nπ Sample input: {input_example}")
# If possible, show intermediate steps
if hasattr(chain, 'stream'):
print("\nβ‘ Processing steps:")
for i, chunk in enumerate(chain.stream(input_example)):
print(f" Step {i+1}: {chunk}")
else:
result = chain.invoke(input_example)
print(f"\nπ€ Final output: {result}")
except Exception as e:
print(f"\nβ Error analyzing chain: {e}")
# Memory usage estimation
import sys
chain_size = sys.getsizeof(chain)
print(f"\nπ Estimated chain size: {chain_size} bytes")
# Example analysis
sample_chain = prompt | llm | parser
analyze_chain_structure(sample_chain, {"topic": "AI"})π― Chain Performance Profiling β
import time
from functools import wraps
def profile_chain_performance(chain):
"""Profile chain performance with detailed metrics"""
def timing_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"β±οΈ {func.__name__}: {execution_time:.4f} seconds")
return result
return wrapper
# Wrap chain methods with timing
original_invoke = chain.invoke
original_stream = getattr(chain, 'stream', None)
original_batch = getattr(chain, 'batch', None)
chain.invoke = timing_decorator(original_invoke)
if original_stream:
chain.stream = timing_decorator(original_stream)
if original_batch:
chain.batch = timing_decorator(original_batch)
return chain
# Example usage
profiled_chain = profile_chain_performance(chain)
print("π Performance Profiling:")
result = profiled_chain.invoke({"topic": "machine learning"})π RunnableParallel - Advanced Parallel Patterns β
RunnableParallel enables sophisticated parallel processing patterns that can dramatically improve performance and enable complex workflows.
π― Advanced Parallel Architectures β
π ADVANCED PARALLEL PATTERNS π
(Sophisticated parallel processing)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ποΈ PATTERN 1: FAN-OUT/FAN-IN β
β β
β Input β
β β β
β βββββββΌββββββ β
β βΌ βΌ βΌ β
β Task1 Task2 Task3 (Fan-out: Split work) β
β β β β β
β βββββββΌββββββ β
β βΌ β
β Combine (Fan-in: Merge results) β
β β β
β Output β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π― PATTERN 2: PIPELINE PARALLEL β
β β
β Stage1 β Stage2 β Stage3 β
β β β β β
β Parallel Parallel Parallel β
β Tasks Tasks Tasks β
β β
β π Each stage processes multiple items simultaneously β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β‘ PATTERN 3: DYNAMIC PARALLEL β
β β
β Input β Analyzer β Dynamic Task Generator β
β β β β
β βΌ βΌ β
β Route to N Create N parallel β
β different chains based on β
β processors analysis β
β β
β π€ Intelligent parallel processing based on content β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββπ Advanced Parallel Examples β
Pattern 1: Multi-Modal Processing β
def multimodal_parallel_demo():
"""Process different types of content in parallel"""
# Different processors for different content types
text_processor = ChatPromptTemplate.from_template(
"Analyze this text for sentiment: {content}"
) | llm | parser
metadata_extractor = RunnableLambda(
lambda x: {
"word_count": len(x["content"].split()),
"char_count": len(x["content"]),
"has_numbers": any(c.isdigit() for c in x["content"])
}
)
keyword_extractor = ChatPromptTemplate.from_template(
"Extract 3 key topics from: {content}"
) | llm | parser
# Parallel processing chain
multimodal_chain = RunnableParallel({
"sentiment": text_processor,
"metadata": metadata_extractor,
"keywords": keyword_extractor,
"original": RunnablePassthrough()
})
# Test with sample content
test_content = {
"content": "Artificial intelligence is revolutionizing how we work and live. "
"From healthcare to finance, AI applications are growing rapidly. "
"By 2025, AI will impact every industry significantly."
}
print("π Multi-Modal Parallel Processing:")
result = multimodal_chain.invoke(test_content)
for key, value in result.items():
print(f" {key.upper()}: {value}")
print()
multimodal_parallel_demo()Pattern 2: Competitive Processing β
def competitive_processing_demo():
"""Run multiple approaches and select the best result"""
# Different approaches to the same problem
approach_a = ChatPromptTemplate.from_template(
"Solve this step by step: {problem}"
) | llm | parser
approach_b = ChatPromptTemplate.from_template(
"Think creatively about this problem: {problem}"
) | llm | parser
approach_c = ChatPromptTemplate.from_template(
"Use logical reasoning for: {problem}"
) | llm | parser
# Parallel competitive processing
competitive_chain = RunnableParallel({
"step_by_step": approach_a,
"creative": approach_b,
"logical": approach_c
})
# Selector to choose best approach
def select_best_solution(results):
# Simple heuristic: longest response (you could use more sophisticated selection)
best_key = max(results.keys(), key=lambda k: len(results[k]))
return {
"selected_approach": best_key,
"best_solution": results[best_key],
"all_approaches": results
}
full_competitive_chain = (
competitive_chain
| RunnableLambda(select_best_solution)
)
# Test competitive processing
problem = {
"problem": "How can we reduce plastic waste in oceans?"
}
print("π Competitive Processing Demo:")
result = full_competitive_chain.invoke(problem)
print(f"Selected approach: {result['selected_approach']}")
print(f"Best solution: {result['best_solution']}")
print("\nAll approaches:")
for approach, solution in result['all_approaches'].items():
print(f" {approach}: {solution[:100]}...")
competitive_processing_demo()Pattern 3: Hierarchical Parallel Processing β
def hierarchical_parallel_demo():
"""Multi-level parallel processing with dependencies"""
# Level 1: Initial analysis
content_analyzer = RunnableLambda(
lambda x: {
"type": "technical" if "AI" in x["text"] else "general",
"complexity": "high" if len(x["text"]) > 100 else "low"
}
)
# Level 2: Specialized processing based on analysis
def create_specialized_processor(analysis):
if analysis["type"] == "technical":
return ChatPromptTemplate.from_template(
"Provide technical analysis of: {text}"
) | llm | parser
else:
return ChatPromptTemplate.from_template(
"Provide general summary of: {text}"
) | llm | parser
# Level 3: Quality assessment
quality_assessor = ChatPromptTemplate.from_template(
"Rate the quality of this analysis (1-10): {analysis}"
) | llm | parser
# Hierarchical chain
def hierarchical_processor(input_data):
# Level 1: Analyze content
analysis = content_analyzer.invoke(input_data)
# Level 2: Parallel specialized processing
specialized_processor = create_specialized_processor(analysis)
level2_parallel = RunnableParallel({
"specialized_analysis": specialized_processor,
"metadata": RunnableLambda(lambda x: analysis),
"original": RunnablePassthrough()
})
level2_result = level2_parallel.invoke(input_data)
# Level 3: Quality assessment
quality_score = quality_assessor.invoke({
"analysis": level2_result["specialized_analysis"]
})
return {
**level2_result,
"quality_score": quality_score
}
hierarchical_chain = RunnableLambda(hierarchical_processor)
# Test hierarchical processing
test_input = {
"text": "AI and machine learning are transforming industries through "
"automated decision-making, predictive analytics, and intelligent "
"process optimization. These technologies enable unprecedented "
"efficiency and accuracy in complex problem-solving scenarios."
}
print("ποΈ Hierarchical Parallel Processing:")
result = hierarchical_chain.invoke(test_input)
for key, value in result.items():
print(f" {key}: {value}")
print()
hierarchical_parallel_demo()π― Production Patterns and Best Practices β
Error Handling in Parallel Chains β
from langchain_core.runnables import RunnableLambda
def robust_parallel_processing():
"""Demonstrate error handling in parallel chains"""
# Chain that might fail
risky_processor = RunnableLambda(
lambda x: 1/0 if x.get("trigger_error") else f"Processed: {x['data']}"
)
# Safe fallback processor
safe_processor = RunnableLambda(
lambda x: f"Safe processing: {x['data']}"
)
# Parallel chain with error handling
def safe_parallel_processor(input_data):
try:
return RunnableParallel({
"primary": risky_processor,
"backup": safe_processor,
"metadata": RunnablePassthrough()
}).invoke(input_data)
except Exception as e:
print(f"β οΈ Primary processor failed: {e}")
return {
"primary": "FAILED",
"backup": safe_processor.invoke(input_data),
"metadata": input_data,
"error": str(e)
}
robust_chain = RunnableLambda(safe_parallel_processor)
# Test with both success and failure cases
test_cases = [
{"data": "normal data", "trigger_error": False},
{"data": "error data", "trigger_error": True}
]
print("π‘οΈ Robust Parallel Processing:")
for i, case in enumerate(test_cases):
print(f"\nTest case {i+1}: {case}")
result = robust_chain.invoke(case)
print(f"Result: {result}")
robust_parallel_processing()Performance Monitoring β
import time
from datetime import datetime
def monitored_parallel_chain():
"""Parallel chain with performance monitoring"""
def monitor_performance(chain_name):
def decorator(func):
def wrapper(input_data):
start_time = time.perf_counter()
start_memory = None # Could add memory monitoring
try:
result = func(input_data)
status = "SUCCESS"
except Exception as e:
result = f"ERROR: {e}"
status = "FAILED"
end_time = time.perf_counter()
execution_time = end_time - start_time
# Log performance metrics
metrics = {
"chain_name": chain_name,
"execution_time": execution_time,
"status": status,
"timestamp": datetime.now().isoformat()
}
print(f"π {chain_name}: {execution_time:.4f}s [{status}]")
return {
"result": result,
"metrics": metrics
}
return wrapper
return decorator
# Create monitored processors
@monitor_performance("text_analyzer")
def text_analyzer(input_data):
time.sleep(0.1) # Simulate processing time
return f"Analysis: {input_data['text'][:50]}..."
@monitor_performance("sentiment_detector")
def sentiment_detector(input_data):
time.sleep(0.05) # Simulate processing time
return "positive" if "good" in input_data['text'].lower() else "neutral"
@monitor_performance("keyword_extractor")
def keyword_extractor(input_data):
time.sleep(0.08) # Simulate processing time
words = input_data['text'].split()
return words[:3] # Return first 3 words as "keywords"
# Monitored parallel chain
monitored_chain = RunnableParallel({
"analysis": RunnableLambda(text_analyzer),
"sentiment": RunnableLambda(sentiment_detector),
"keywords": RunnableLambda(keyword_extractor)
})
# Test monitoring
test_input = {
"text": "This is a good example of text that we want to process "
"using our advanced parallel processing system."
}
print("π Performance Monitored Parallel Processing:")
result = monitored_chain.invoke(test_input)
print("\nResults:")
for key, value in result.items():
print(f" {key}: {value}")
monitored_parallel_chain()π Learning Path and Next Steps β
Mastery Checklist β
def lcel_mastery_checklist():
"""Self-assessment checklist for LCEL mastery"""
skills = {
"Basic LCEL": [
"β
Create simple prompt | llm | parser chains",
"β
Use invoke() for single inputs",
"β
Understand pipe operator (|) syntax"
],
"Batch Processing": [
"β‘ Implement efficient batch processing",
"β‘ Configure batch parameters (concurrency, error handling)",
"β‘ Use async batch processing for performance"
],
"Streaming": [
"β‘ Implement real-time streaming responses",
"β‘ Add streaming metrics and monitoring",
"β‘ Handle streaming errors gracefully"
],
"RunnablePassthrough": [
"β‘ Use RunnablePassthrough for data flow control",
"β‘ Implement RunnablePassthrough.assign() patterns",
"β‘ Create complex data transformation pipelines"
],
"Parallel Processing": [
"β‘ Design efficient parallel processing architectures",
"β‘ Implement competitive processing patterns",
"β‘ Build hierarchical parallel chains"
],
"Visualization & Debugging": [
"β‘ Create visual representations of complex chains",
"β‘ Implement performance monitoring",
"β‘ Debug complex chain issues effectively"
],
"Production Readiness": [
"β‘ Implement robust error handling",
"β‘ Add comprehensive monitoring and logging",
"β‘ Optimize chains for production performance"
]
}
print("π LCEL Advanced Mastery Checklist")
print("=" * 50)
for category, skill_list in skills.items():
print(f"\nπ {category}:")
for skill in skill_list:
print(f" {skill}")
print(f"\nπ‘ Focus on completing the β‘ items to achieve mastery!")
lcel_mastery_checklist()Next Steps β
- Practice Projects: Build increasingly complex LCEL applications
- Performance Optimization: Focus on production-ready patterns
- Integration: Combine LCEL with other LangChain components
- Community: Share your patterns and learn from others
Related Documentation:
- LCEL Basics: Start with fundamentals
- Memory Integration: Add memory to LCEL chains
- Agent Building: Create autonomous agents with LCEL
- RAG Systems: Build retrieval-augmented generation systems
Key Takeaways:
- Batch processing dramatically improves throughput for multiple inputs
- Streaming provides better user experience for long-running operations
- RunnablePassthrough enables sophisticated data flow control
- Parallel processing unlocks complex, high-performance architectures
- Visualization is crucial for understanding and debugging complex chains