LangChain Expression Language (LCEL) - Step by Step Guide β
Learn how to build clean, efficient AI pipelines using LangChain's declarative syntax, step by step from basics to advanced concepts
β‘ What is LangChain Expression Language (LCEL)? β
Definition: A declarative way to compose LangChain components into workflows using simple operators like | (pipe) to create clean, readable, and reusable AI pipelines.
Simple Analogy: Think of LCEL like a recipe where you connect cooking steps with arrows. Instead of writing detailed instructions for each step, you just say: ingredients | mix | bake | serve. Each step knows what to do with what it receives.
Let's understand this step by step:
β‘ LCEL: FROM MANUAL TO AUTOMATIC β‘
(The Great Simplification)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π§± BEFORE LCEL: Manual Steps β
β (The Old Way) β
β β
β π¨βπ» Developer writes: β
β 1. prompt = create_prompt_template() β
β 2. filled_prompt = prompt.format(user_input) β
β 3. model = setup_llm() β
β 4. response = model.invoke(filled_prompt) β
β 5. parsed = parse_output(response) β
β 6. return parsed β
β β
β β Problems: Verbose, repetitive, error-prone β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β β‘ ENTER LCEL β‘ β
β (The Modern Way) β
ββββ¬βββββββββββββββββββ¬ββββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
WITH LCEL: Declarative Chain β
β β
β π― Developer writes: β
β chain = prompt | model | parser β
β result = chain.invoke(user_input) β
β β
β β
Benefits: Clean, reusable, composable β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ
β π MAGIC HAPPENS AUTOMATICALLY β
β β
β β’ Prompt gets filled automatically β
β β’ Data flows through each step β
β β’ Error handling built-in β
β β’ Streaming & async support β
β β’ Debugging & tracing included β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββUnderstanding LCEL Core Concepts - What Makes It Special? β
Let's break down what makes LCEL so powerful by examining its core features step by step.
π― Step 1: The Pipe Operator (|) - Connecting the Dots β
Think of the pipe operator as a conveyor belt that automatically passes data from one station to the next:
π THE LCEL ASSEMBLY LINE π
(How data flows through)
π User Input: "What is the capital of France?"
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STATION 1: PROMPT TEMPLATE β
β π Takes: Raw input β
β π Does: Formats into proper prompt β
β π€ Outputs: "Answer this question: What is the capital..." β
βββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββ
β (pipe |)
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STATION 2: LANGUAGE MODEL β
β π Takes: Formatted prompt β
β π Does: Generates AI response β
β π€ Outputs: "The capital of France is Paris." β
βββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββ
β (pipe |)
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STATION 3: OUTPUT PARSER β
β π Takes: Raw AI response β
β π Does: Cleans and formats output β
β π€ Outputs: "Paris" β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
π Final Result: "Paris"Key Insight: Each component only needs to know about its immediate input and output. The pipe operator handles all the connection logic automatically.
π Step 2: Composability - Building Blocks Like Lego β
LCEL components are designed to work together seamlessly:
π§© LCEL BUILDING BLOCKS π§©
(Mix and match components)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BASIC BUILDING BLOCKS β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββΌββββββββββββββ
β β β
ββββββββΌβββββββ ββββββΌβββββ ββββββββΌβββββββ
βπ PROMPTS β βπ€ MODELSβ βπ§ PARSERS β
β β β β β β
ββ’ Templates β ββ’ LLMs β ββ’ String β
ββ’ Chat β ββ’ Chat β ββ’ JSON β
ββ’ Few-shot β ββ’ Local β ββ’ Custom β
βββββββββββββββ βββββββββββ βββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ADVANCED COMPONENTS β
β β
β π οΈ Tools π§ Memory π Retrievers βοΈ Functions β
β β’ Calculators β’ Conversation β’ Vector DBs β’ Custom logic β
β β’ Web search β’ Buffer β’ Documents β’ Transforms β
β β’ APIs β’ Summary β’ Knowledge β’ Validators β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ
β π― COMBINE ANY WAY YOU WANT β
β β
β Simple: prompt | model | parser β
β With Tools: prompt | model | tool | parser β
β With Memory: memory + prompt | model | parser β
β Complex: retriever | prompt | model | validator β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββπ Step 3: Automatic Features - What You Get for Free β
When you use LCEL, you automatically get powerful features without writing extra code:
π LCEL AUTOMATIC FEATURES π
(What you get without asking)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π INPUT/OUTPUT β
β β
β β
Automatic Type Conversion β
β β’ String β ChatMessage β
β β’ Dict β PromptValue β
β β’ List β Multiple inputs β
β β
β β
Schema Validation β
β β’ Checks input format β
β β’ Validates data types β
β β’ Clear error messages β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β β‘ PERFORMANCE FEATURES β
β β
β π Streaming Support β
β β’ Real-time output β
β β’ Progressive results β
β β’ Better user experience β
β β
β π Async & Parallel β
β β’ Multiple chains at once β
β β’ Non-blocking execution β
β β’ Faster processing β
ββββββββββββββββββββββ¬ββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π οΈ DEBUGGING & MONITORING β
β β
β π Built-in Tracing β
β β’ See exactly what happens β
β β’ Performance metrics β
β β’ Error tracking β
β β
β π LangSmith Integration β
β β’ Visual debugging β
β β’ Performance analytics β
β β’ Production monitoring β
βββββββββββββββββββββββββββββββββββββββββββLCEL vs Traditional Chains - A Clear Comparison β
Let's see the difference with a real example that gets more complex:
οΏ½ Traditional Approach (The Old Way) β
# Example: Creating a translation and summary chain
from langchain.chains import LLMChain, SimpleSequentialChain
from langchain.prompts import PromptTemplate
# Step 1: Create translation chain
translate_prompt = PromptTemplate(
input_variables=["text"],
template="Translate this to French: {text}"
)
translate_chain = LLMChain(
llm=llm,
prompt=translate_prompt,
output_key="translated_text"
)
# Step 2: Create summary chain
summary_prompt = PromptTemplate(
input_variables=["text"],
template="Summarize this text: {text}"
)
summary_chain = LLMChain(
llm=llm,
prompt=summary_prompt,
output_key="summary"
)
# Step 3: Combine into sequential chain
full_chain = SimpleSequentialChain(
chains=[translate_chain, summary_chain],
verbose=True
)
# Step 4: Use the chain
result = full_chain.run("Your text here")β‘ LCEL Approach (The Modern Way) β
# Same functionality with LCEL
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Define prompts
translate_prompt = ChatPromptTemplate.from_template(
"Translate this to French: {text}"
)
summary_prompt = ChatPromptTemplate.from_template(
"Summarize this text: {text}"
)
# Create parser
parser = StrOutputParser()
# π― The magic: One line chain creation
chain = (
translate_prompt
| llm
| {"text": parser}
| summary_prompt
| llm
| parser
)
# Use the chain
result = chain.invoke({"text": "Your text here"})οΏ½ Side-by-Side Comparison β
| Feature | Traditional Chains | LCEL |
|---|---|---|
| Lines of Code | ~20 lines | ~8 lines |
| Readability | Complex, nested | Clean, linear |
| Debugging | Manual | Built-in tracing |
| Reusability | Hard to modify | Easy to extend |
| Streaming | Extra setup | Automatic |
| Type Safety | Manual | Automatic |
| Error Handling | Manual | Built-in |
Step-by-Step: Building Your First LCEL Chain β
Let's build a practical example from scratch to see how LCEL works in practice.
π― Goal: Building a Smart Assistant β
We'll create an assistant that:
- Takes a user question
- Adds helpful context
- Gets an AI response
- Formats it nicely
π Step 1: Set Up Components β
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Set up the model
llm = ChatOpenAI(temperature=0)
# Create a helpful prompt template
prompt = ChatPromptTemplate.from_template("""
You are a helpful assistant. Answer the following question clearly and concisely:
Question: {question}
Please provide a helpful answer with examples if relevant.
""")
# Set up output parser for clean string output
parser = StrOutputParser()π Step 2: Connect with LCEL β
# π― The LCEL magic: Create the chain
chain = prompt | llm | parser
# That's it! Your chain is readyπ Step 3: Use Your Chain β
# Single invocation
result = chain.invoke({"question": "What is machine learning?"})
print(result)
# Batch processing
questions = [
{"question": "What is Python?"},
{"question": "How does AI work?"},
{"question": "What is data science?"}
]
results = chain.batch(questions)
for q, r in zip(questions, results):
print(f"Q: {q['question']}")
print(f"A: {r}\n")
# Streaming response
for chunk in chain.stream({"question": "Explain quantum computing"}):
print(chunk, end="")π§© Step 4: Extend Your Chain β
Adding features is incredibly easy with LCEL:
# Add input validation
from langchain_core.runnables import RunnableLambda
def validate_question(input_dict):
question = input_dict["question"]
if len(question) < 3:
raise ValueError("Question too short")
return input_dict
# Add response enhancement
def enhance_response(response):
return f"π‘ **Answer**: {response}\n\n---\n*Generated by AI Assistant*"
# π Extended chain with validation and enhancement
enhanced_chain = (
RunnableLambda(validate_question)
| prompt
| llm
| parser
| RunnableLambda(enhance_response)
)
# Test the enhanced chain
result = enhanced_chain.invoke({"question": "What is blockchain?"})
print(result)Advanced LCEL Patterns - Parallel Processing and Branching β
Now let's explore more sophisticated patterns that showcase LCEL's real power.
π Parallel Processing - Doing Multiple Things at Once β
π PARALLEL PROCESSING PATTERN π
(Multiple paths, faster results)
π User Input
β
βΌ
βββββββββββββββββββββββ
β INPUT SPLITTER β
β "Fork the data" β
βββββββββββ¬ββββββββββββ
β
ββββββββββββββββββΌβββββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β PATH A β β PATH B β β PATH C β
β β β β β β
β Translate β β Summarize β β Analyze β
β to French β β content β β sentiment β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β β β
ββββββββββββββββββΌβββββββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β RESULT COMBINER β
β "Join the results" β
βββββββββββββββββββββββ
β
βΌ
π Combined OutputReal Implementation:
from langchain_core.runnables import RunnableParallel
# Define individual chains
translate_chain = translate_prompt | llm | parser
summarize_chain = summarize_prompt | llm | parser
sentiment_chain = sentiment_prompt | llm | parser
# π Parallel execution
parallel_chain = RunnableParallel({
"translation": translate_chain,
"summary": summarize_chain,
"sentiment": sentiment_chain
})
# Combine with final processing
def format_results(results):
return f"""
π **Original Analysis Complete**
π **Translation**: {results['translation']}
π **Summary**: {results['summary']}
π **Sentiment**: {results['sentiment']}
"""
# Full parallel chain
full_chain = (
{"text": RunnableLambda(lambda x: x["text"])}
| parallel_chain
| RunnableLambda(format_results)
)
# Use it
result = full_chain.invoke({"text": "Your text here"})
print(result)π― Conditional Branching - Smart Decision Making β
from langchain_core.runnables import RunnableBranch
# Define different response styles
casual_prompt = ChatPromptTemplate.from_template(
"Answer this casually: {question}"
)
formal_prompt = ChatPromptTemplate.from_template(
"Provide a formal, academic response to: {question}"
)
technical_prompt = ChatPromptTemplate.from_template(
"Give a technical explanation for: {question}"
)
# π§ Smart branching logic
def choose_style(input_dict):
question = input_dict["question"].lower()
if "explain" in question or "how" in question:
return "technical"
elif "formal" in question or "academic" in question:
return "formal"
else:
return "casual"
# Create branching chain
branching_chain = RunnableBranch(
(lambda x: choose_style(x) == "technical", technical_prompt | llm | parser),
(lambda x: choose_style(x) == "formal", formal_prompt | llm | parser),
casual_prompt | llm | parser # default
)
# Test different question types
questions = [
{"question": "How does neural networks work?"}, # β Technical
{"question": "What's up with AI?"}, # β Casual
{"question": "Provide formal analysis of ML"}, # β Formal
]
for q in questions:
result = branching_chain.invoke(q)
print(f"Q: {q['question']}")
print(f"A: {result}\n")Real-World LCEL Applications - Practical Examples β
Let's see how LCEL is used in production applications.
π― Application 1: RAG (Retrieval-Augmented Generation) System β
# Complete RAG system with LCEL
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.runnables import RunnablePassthrough
# Set up retrieval
vectorstore = Chroma(embedding_function=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
# RAG prompt
rag_prompt = ChatPromptTemplate.from_template("""
Use the following context to answer the question:
Context: {context}
Question: {question}
Answer based on the context provided. If you don't know, say so.
""")
# π― RAG Chain with LCEL
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| rag_prompt
| llm
| parser
)
# Use the RAG system
answer = rag_chain.invoke("What is the company policy on remote work?")π― Application 2: Multi-Step Research Assistant β
# Research assistant that searches, analyzes, and reports
search_prompt = ChatPromptTemplate.from_template(
"Generate search queries for: {topic}"
)
analyze_prompt = ChatPromptTemplate.from_template(
"Analyze this information and extract key insights: {content}"
)
report_prompt = ChatPromptTemplate.from_template(
"Create a comprehensive report on {topic} using these insights: {insights}"
)
# π Multi-step research chain
research_chain = (
{"topic": RunnablePassthrough()}
| search_prompt
| llm
| {"queries": parser}
| {"content": web_search_tool, "topic": lambda x: x["topic"]}
| {"insights": analyze_prompt | llm | parser, "topic": lambda x: x["topic"]}
| report_prompt
| llm
| parser
)π― Application 3: Customer Service Bot β
# Smart customer service with escalation
classification_prompt = ChatPromptTemplate.from_template(
"Classify this customer inquiry: {message}\nCategories: billing, technical, general"
)
# Different response templates
billing_prompt = ChatPromptTemplate.from_template(
"Handle this billing inquiry professionally: {message}"
)
technical_prompt = ChatPromptTemplate.from_template(
"Provide technical support for: {message}"
)
general_prompt = ChatPromptTemplate.from_template(
"Respond helpfully to: {message}"
)
# π€ Smart customer service chain
def route_to_specialist(classified_result):
category = classified_result.lower()
if "billing" in category:
return billing_prompt
elif "technical" in category:
return technical_prompt
else:
return general_prompt
customer_service_chain = (
{"message": RunnablePassthrough()}
| {"classification": classification_prompt | llm | parser, "message": lambda x: x["message"]}
| {"prompt": RunnableLambda(lambda x: route_to_specialist(x["classification"])), "message": lambda x: x["message"]}
| {"response": lambda x: x["prompt"] | llm | parser}
)LCEL Best Practices - Writing Clean, Maintainable Chains β
π― Practice 1: Keep Chains Readable β
# β Avoid: One giant unreadable chain
bad_chain = prompt | llm | parser | validator | formatter | emailer | logger
# β
Better: Break into logical sections
process_input = prompt | llm | parser
validate_output = validator | formatter
send_result = emailer | logger
good_chain = process_input | validate_output | send_resultπ― Practice 2: Use Meaningful Names β
# β Avoid: Generic names
chain = prompt | llm | parser
# β
Better: Descriptive names
customer_query_handler = (
customer_prompt
| support_llm
| response_parser
)π― Practice 3: Add Error Handling β
from langchain_core.runnables import RunnableLambda
def safe_invoke(func):
def wrapper(input_data):
try:
return func(input_data)
except Exception as e:
return {"error": f"Processing failed: {str(e)}"}
return wrapper
# Add error handling to chains
safe_chain = (
RunnableLambda(safe_invoke(lambda x: validate_input(x)))
| prompt
| llm
| RunnableLambda(safe_invoke(lambda x: parse_output(x)))
)π― Practice 4: Make Chains Testable β
# Create testable components
input_validator = RunnableLambda(validate_user_input)
prompt_formatter = user_prompt | template_filler
ai_processor = llm | response_parser
output_formatter = RunnableLambda(format_final_response)
# Compose the full chain
full_chain = (
input_validator
| prompt_formatter
| ai_processor
| output_formatter
)
# Easy to test individual components
test_input = {"question": "What is AI?"}
assert input_validator.invoke(test_input) == expected_outputCommon LCEL Patterns - Reusable Solutions β
π Pattern 1: Input Transformation β
# Transform input before processing
input_transformer = RunnableLambda(lambda x: {
"question": x["user_input"].strip().lower(),
"context": get_user_context(x["user_id"])
})
chain = input_transformer | prompt | llm | parserπ Pattern 2: Output Enhancement β
# Enhance output with metadata
def add_metadata(response):
return {
"answer": response,
"timestamp": datetime.now(),
"model": "gpt-4",
"confidence": calculate_confidence(response)
}
enhanced_chain = prompt | llm | parser | RunnableLambda(add_metadata)π Pattern 3: Fallback Chains β
# Create fallback for when primary chain fails
primary_chain = complex_prompt | advanced_llm | parser
fallback_chain = simple_prompt | basic_llm | parser
robust_chain = primary_chain.with_fallbacks([fallback_chain])Debugging LCEL Chains - Finding and Fixing Issues β
π Built-in Debugging Features β
# Enable verbose mode for debugging
chain = prompt | llm | parser
result = chain.invoke({"question": "test"}, config={"verbose": True})
# Use intermediate steps
intermediate_chain = (
prompt
| RunnableLambda(lambda x: print(f"Prompt output: {x}") or x)
| llm
| RunnableLambda(lambda x: print(f"LLM output: {x}") or x)
| parser
)π οΈ LangSmith Integration β
import os
from langsmith import Client
# Set up LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
# Your chains automatically get traced
traced_chain = prompt | llm | parser
result = traced_chain.invoke({"question": "test"})
# View detailed traces in LangSmith dashboardπ― Custom Debugging β
def debug_step(step_name):
def debug_func(input_data):
print(f"π {step_name}: {input_data}")
return input_data
return RunnableLambda(debug_func)
# Add debugging to your chain
debug_chain = (
debug_step("Input received")
| prompt
| debug_step("Prompt formatted")
| llm
| debug_step("LLM responded")
| parser
| debug_step("Final output")
)Performance Optimization - Making LCEL Chains Faster β
β‘ Optimization 1: Parallel Processing β
# Instead of sequential processing
slow_chain = step1 | step2 | step3
# Use parallel where possible
fast_chain = RunnableParallel({
"result1": step1,
"result2": step2,
"result3": step3
})β‘ Optimization 2: Streaming β
# Enable streaming for better user experience
streaming_chain = prompt | llm | parser
# Stream results in real-time
for chunk in streaming_chain.stream({"question": "Long question"}):
print(chunk, end="")β‘ Optimization 3: Caching β
from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache
# Enable caching to avoid repeat calls
set_llm_cache(InMemoryCache())
cached_chain = prompt | llm | parserLCEL vs Other Approaches - When to Use What β
π Decision Matrix β
| Use Case | LCEL | Traditional Chains | Custom Code |
|---|---|---|---|
| Simple Q&A | β Perfect | β Overkill | β Too much work |
| Complex Workflows | β Excellent | β οΈ Gets messy | β Hard to maintain |
| Parallel Processing | β Built-in | β Complex setup | β οΈ Possible but hard |
| Streaming | β Automatic | β Manual work | β Lots of code |
| Debugging | β Built-in tools | β οΈ Limited | β DIY |
| Learning Curve | β οΈ Medium | β Familiar | β Steep |
π― Recommendation Guide β
Choose LCEL when:
- Building new LangChain applications
- Need clean, maintainable code
- Want built-in debugging and tracing
- Planning to scale or extend functionality
- Working with teams (better readability)
Choose Traditional Chains when:
- Working with legacy code
- Need specific chain behaviors not in LCEL
- Team is not familiar with LCEL yet
Choose Custom Code when:
- Very specific requirements not covered by LangChain
- Performance is absolutely critical
- Working outside the LangChain ecosystem
Learning Path - Mastering LCEL Step by Step β
π― Beginner Path (Week 1-2) β
- Basic Chains: prompt | llm | parser
- Input/Output: Understanding data flow
- Simple Examples: Q&A, translation, summarization
- Error Handling: Basic try/catch patterns
π― Intermediate Path (Week 3-4) β
- Parallel Processing: RunnableParallel
- Conditional Logic: RunnableBranch
- Custom Components: RunnableLambda
- Real Applications: RAG, chatbots, analysis tools
π― Advanced Path (Week 5-6) β
- Complex Workflows: Multi-step processes
- Performance Optimization: Streaming, caching, async
- Production Deployment: Error handling, monitoring
- Custom Tools: Building reusable components
π― Expert Path (Ongoing) β
- Architecture Patterns: Microservices with LCEL
- Scaling: Distributed processing
- Monitoring: Advanced debugging and analytics
- Contributing: Building tools for the community
Next Steps β
Ready to dive deeper? Explore these related topics:
- Prompt Engineering: Master the art of crafting effective prompts
- Memory Systems: Add conversation memory to your LCEL chains
- RAG Implementation: Build retrieval-augmented generation systems
- Agent Development: Create autonomous AI agents with LCEL
Key Takeaways:
- LCEL simplifies chain creation with the pipe operator (
|) - Automatic features like streaming, async, and debugging come built-in
- Composable design makes it easy to build complex workflows
- Production ready with built-in error handling and monitoring
- Learn progressively from simple chains to complex applications
π Core LCEL Operators β
| Pipe Operator (Sequential Composition) β
The pipe operator | is the foundation of LCEL, allowing you to chain components sequentially.
Basic Pipe Operations β
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.llms import OpenAI
# Basic pipe chain
llm = OpenAI(temperature=0.7)
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
chain = prompt | llm | StrOutputParser()
# Execute chain
result = chain.invoke({"topic": "programming"})
print(result) # Returns a programming joke as a stringMulti-Step Processing β
from langchain.prompts import PromptTemplate
from langchain.output_parsers.json import SimpleJsonOutputParser
# Multi-step analysis chain
analysis_prompt = PromptTemplate.from_template("""
Analyze the following text and return a JSON object with these fields:
- sentiment: positive/negative/neutral
- key_topics: list of main topics
- word_count: number of words
Text: {text}
""")
json_parser = SimpleJsonOutputParser()
analysis_chain = analysis_prompt | llm | json_parser
# Process text
result = analysis_chain.invoke({
"text": "I love using LangChain for building AI applications. It makes development so much easier!"
})
print(result)
# Output: {"sentiment": "positive", "key_topics": ["LangChain", "AI applications", "development"], "word_count": 15}Chain with Transformation β
from langchain.schema.runnable import RunnableLambda
def extract_keywords(text: str) -> dict:
"""Extract keywords from text"""
words = text.lower().split()
keywords = [word for word in words if len(word) > 4]
return {"keywords": keywords, "original_text": text}
def format_output(data: dict) -> str:
"""Format the final output"""
keywords = ", ".join(data["keywords"])
return f"Keywords found: {keywords}\nOriginal: {data['original_text']}"
# Chain with custom transformations
keyword_chain = (
RunnableLambda(extract_keywords) |
RunnableLambda(format_output)
)
result = keyword_chain.invoke("LangChain provides powerful tools for building intelligent applications")
print(result)+ Parallel Operator (Concurrent Execution) β
The parallel operator allows multiple operations to run simultaneously and combine results.
Basic Parallel Processing β
from langchain.schema.runnable import RunnableParallel
# Define parallel prompts
summary_prompt = ChatPromptTemplate.from_template("Summarize this text: {text}")
keywords_prompt = ChatPromptTemplate.from_template("Extract key concepts from: {text}")
sentiment_prompt = ChatPromptTemplate.from_template("Analyze sentiment of: {text}")
# Create parallel chain
parallel_chain = RunnableParallel(
summary=summary_prompt | llm | StrOutputParser(),
keywords=keywords_prompt | llm | StrOutputParser(),
sentiment=sentiment_prompt | llm | StrOutputParser()
)
# Execute parallel operations
results = parallel_chain.invoke({
"text": "Artificial Intelligence is revolutionizing healthcare by enabling faster diagnosis and personalized treatments."
})
print("Summary:", results["summary"])
print("Keywords:", results["keywords"])
print("Sentiment:", results["sentiment"])Parallel with Post-Processing β
def combine_analysis(results: dict) -> str:
"""Combine parallel analysis results"""
combined = f"""
COMPREHENSIVE ANALYSIS
=====================
Summary: {results['summary']}
Key Concepts: {results['keywords']}
Sentiment: {results['sentiment']}
Analysis complete at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return combined
# Parallel analysis with combination
comprehensive_chain = (
RunnableParallel(
summary=summary_prompt | llm | StrOutputParser(),
keywords=keywords_prompt | llm | StrOutputParser(),
sentiment=sentiment_prompt | llm | StrOutputParser()
) |
RunnableLambda(combine_analysis)
)
final_result = comprehensive_chain.invoke({
"text": "Machine learning models are becoming increasingly sophisticated and accessible."
})
print(final_result)? Conditional Operator (Branching Logic) β
Implement conditional logic in your chains based on input or intermediate results.
Basic Conditional Chain β
from langchain.schema.runnable import RunnableBranch
def route_by_length(text: str) -> str:
"""Route based on text length"""
return "short" if len(text.split()) < 10 else "long"
# Define different processing for short vs long texts
short_prompt = ChatPromptTemplate.from_template("Give a brief response to: {text}")
long_prompt = ChatPromptTemplate.from_template("Provide a detailed analysis of: {text}")
# Conditional chain
conditional_chain = RunnableBranch(
(lambda x: len(x["text"].split()) < 10, short_prompt | llm | StrOutputParser()),
long_prompt | llm | StrOutputParser() # Default for long texts
)
# Test with short text
short_result = conditional_chain.invoke({"text": "Hello world"})
print("Short text result:", short_result)
# Test with long text
long_result = conditional_chain.invoke({
"text": "Artificial intelligence and machine learning are transforming industries across the globe by automating complex tasks and providing intelligent insights."
})
print("Long text result:", long_result)Advanced Conditional Logic β
from typing import Dict, Any
def classify_query_type(input_data: Dict[str, Any]) -> str:
"""Classify the type of query"""
query = input_data["query"].lower()
if any(word in query for word in ["calculate", "math", "compute"]):
return "calculation"
elif any(word in query for word in ["search", "find", "look up"]):
return "search"
elif any(word in query for word in ["analyze", "examine", "study"]):
return "analysis"
else:
return "general"
# Different chains for different query types
calculation_chain = ChatPromptTemplate.from_template(
"Solve this mathematical problem step by step: {query}"
) | llm | StrOutputParser()
search_chain = ChatPromptTemplate.from_template(
"I need to search for information about: {query}. What would be the best search strategy?"
) | llm | StrOutputParser()
analysis_chain = ChatPromptTemplate.from_template(
"Provide a thorough analysis of: {query}"
) | llm | StrOutputParser()
general_chain = ChatPromptTemplate.from_template(
"Please help me with: {query}"
) | llm | StrOutputParser()
# Smart routing chain
smart_chain = RunnableBranch(
(lambda x: classify_query_type(x) == "calculation", calculation_chain),
(lambda x: classify_query_type(x) == "search", search_chain),
(lambda x: classify_query_type(x) == "analysis", analysis_chain),
general_chain # Default
)
# Test different query types
queries = [
"Calculate the compound interest on $1000 at 5% for 3 years",
"Find information about quantum computing",
"Analyze the impact of social media on teenagers",
"Help me plan a vacation"
]
for query in queries:
result = smart_chain.invoke({"query": query})
print(f"Query: {query}")
print(f"Response: {result}\n")π Advanced LCEL Patterns β
β Streaming and Async Operations β
Basic Streaming β
import asyncio
from langchain.schema.runnable import RunnablePassthrough
# Streaming chain
streaming_chain = (
ChatPromptTemplate.from_template("Write a story about {topic}") |
llm |
StrOutputParser()
)
# Stream results
async def stream_story():
async for chunk in streaming_chain.astream({"topic": "a robot learning to paint"}):
print(chunk, end="", flush=True)
print() # New line at the end
# Run streaming
# asyncio.run(stream_story())Async Parallel Processing β
async def async_analysis_chain(text: str) -> dict:
"""Perform async analysis"""
# Define async chains
summary_chain = summary_prompt | llm | StrOutputParser()
keywords_chain = keywords_prompt | llm | StrOutputParser()
sentiment_chain = sentiment_prompt | llm | StrOutputParser()
# Run in parallel
results = await asyncio.gather(
summary_chain.ainvoke({"text": text}),
keywords_chain.ainvoke({"text": text}),
sentiment_chain.ainvoke({"text": text})
)
return {
"summary": results[0],
"keywords": results[1],
"sentiment": results[2]
}
# Usage
async def run_async_analysis():
text = "The future of artificial intelligence looks promising with many exciting developments."
results = await async_analysis_chain(text)
print(results)
# asyncio.run(run_async_analysis())π― Dynamic Chain Composition β
Runtime Chain Building β
class DynamicChainBuilder:
def __init__(self, llm):
self.llm = llm
self.components = {
"prompts": {
"summary": ChatPromptTemplate.from_template("Summarize: {text}"),
"analysis": ChatPromptTemplate.from_template("Analyze: {text}"),
"translation": ChatPromptTemplate.from_template("Translate to {language}: {text}"),
"creative": ChatPromptTemplate.from_template("Write creatively about: {text}")
},
"parsers": {
"string": StrOutputParser(),
"json": SimpleJsonOutputParser()
}
}
def build_chain(self, steps: List[str]) -> any:
"""Build chain based on step specifications"""
chain = None
for step in steps:
if step in self.components["prompts"]:
component = self.components["prompts"][step] | self.llm | StrOutputParser()
elif step in self.components["parsers"]:
component = self.components["parsers"][step]
else:
continue
if chain is None:
chain = component
else:
chain = chain | component
return chain
def build_parallel_chain(self, parallel_steps: Dict[str, str]) -> any:
"""Build parallel chain with named outputs"""
parallel_dict = {}
for name, step in parallel_steps.items():
if step in self.components["prompts"]:
parallel_dict[name] = self.components["prompts"][step] | self.llm | StrOutputParser()
return RunnableParallel(parallel_dict)
# Usage
builder = DynamicChainBuilder(llm)
# Build sequential chain
sequential_chain = builder.build_chain(["summary", "analysis"])
# Build parallel chain
parallel_chain = builder.build_parallel_chain({
"summary": "summary",
"creative": "creative",
"analysis": "analysis"
})
# Test chains
text = "Renewable energy technologies are rapidly advancing and becoming more cost-effective."
sequential_result = sequential_chain.invoke({"text": text})
parallel_result = parallel_chain.invoke({"text": text})
print("Sequential result:", sequential_result)
print("Parallel results:", parallel_result)Configuration-Driven Chains β
import yaml
from typing import Dict, Any
class ConfigurableChain:
def __init__(self, llm, config: Dict[str, Any]):
self.llm = llm
self.config = config
self.chain = self._build_from_config()
def _build_from_config(self):
"""Build chain based on configuration"""
if self.config["type"] == "sequential":
return self._build_sequential()
elif self.config["type"] == "parallel":
return self._build_parallel()
elif self.config["type"] == "conditional":
return self._build_conditional()
else:
raise ValueError(f"Unknown chain type: {self.config['type']}")
def _build_sequential(self):
"""Build sequential chain from config"""
chain = None
for step in self.config["steps"]:
if step["type"] == "prompt":
component = ChatPromptTemplate.from_template(step["template"]) | self.llm | StrOutputParser()
elif step["type"] == "transform":
component = RunnableLambda(eval(step["function"]))
if chain is None:
chain = component
else:
chain = chain | component
return chain
def _build_parallel(self):
"""Build parallel chain from config"""
parallel_dict = {}
for name, step in self.config["branches"].items():
if step["type"] == "prompt":
parallel_dict[name] = ChatPromptTemplate.from_template(step["template"]) | self.llm | StrOutputParser()
chain = RunnableParallel(parallel_dict)
if "post_process" in self.config:
post_processor = RunnableLambda(eval(self.config["post_process"]))
chain = chain | post_processor
return chain
def run(self, input_data: Dict[str, Any]) -> Any:
"""Run the configured chain"""
return self.chain.invoke(input_data)
# Configuration examples
sequential_config = {
"type": "sequential",
"steps": [
{
"type": "prompt",
"template": "Extract key points from: {text}"
},
{
"type": "prompt",
"template": "Expand on these points: {text}"
}
]
}
parallel_config = {
"type": "parallel",
"branches": {
"summary": {
"type": "prompt",
"template": "Summarize: {text}"
},
"keywords": {
"type": "prompt",
"template": "Extract keywords from: {text}"
}
},
"post_process": "lambda x: f\"Summary: {x['summary']}\\nKeywords: {x['keywords']}\""
}
# Create and run configurable chains
sequential_chain = ConfigurableChain(llm, sequential_config)
parallel_chain = ConfigurableChain(llm, parallel_config)
text_input = {"text": "Climate change is affecting global weather patterns significantly."}
print("Sequential result:", sequential_chain.run(text_input))
print("Parallel result:", parallel_chain.run(text_input))π‘οΈ Error Handling & Validation β
π§ Robust Error Handling β
from langchain.schema.runnable import RunnablePassthrough
from typing import Union
class ErrorHandlingChain:
def __init__(self, llm):
self.llm = llm
def safe_invoke(self, chain, input_data: dict, fallback_response: str = None) -> Union[str, dict]:
"""Safely invoke chain with error handling"""
try:
return chain.invoke(input_data)
except Exception as e:
error_msg = f"Chain execution failed: {str(e)}"
if fallback_response:
return f"{error_msg}\nFallback: {fallback_response}"
return error_msg
def create_fallback_chain(self, primary_chain, fallback_chain):
"""Create chain with fallback option"""
def fallback_function(input_data: dict):
try:
return primary_chain.invoke(input_data)
except Exception:
return fallback_chain.invoke(input_data)
return RunnableLambda(fallback_function)
def create_retry_chain(self, chain, max_retries: int = 3):
"""Create chain with retry logic"""
def retry_function(input_data: dict):
last_error = None
for attempt in range(max_retries):
try:
return chain.invoke(input_data)
except Exception as e:
last_error = e
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed, retrying...")
continue
return f"All {max_retries} attempts failed. Last error: {str(last_error)}"
return RunnableLambda(retry_function)
# Usage
error_handler = ErrorHandlingChain(llm)
# Primary chain (might fail)
risky_chain = ChatPromptTemplate.from_template("Process this complex request: {text}") | llm | StrOutputParser()
# Fallback chain (simple, reliable)
safe_chain = ChatPromptTemplate.from_template("I can help with: {text}") | llm | StrOutputParser()
# Create fallback chain
robust_chain = error_handler.create_fallback_chain(risky_chain, safe_chain)
# Create retry chain
retry_chain = error_handler.create_retry_chain(risky_chain, max_retries=2)
# Test error handling
test_input = {"text": "Very complex and potentially problematic input"}
result = robust_chain.invoke(test_input)
print("Robust chain result:", result)β Input Validation β
from pydantic import BaseModel, validator
from typing import Optional, List
class InputValidator:
def __init__(self):
pass
def validate_text_input(self, input_data: dict) -> dict:
"""Validate text input"""
text = input_data.get("text", "")
# Basic validation
if not text or not text.strip():
raise ValueError("Text input cannot be empty")
if len(text) > 10000:
raise ValueError("Text input too long (max 10,000 characters)")
# Clean text
cleaned_text = text.strip()
return {"text": cleaned_text, **{k: v for k, v in input_data.items() if k != "text"}}
def validate_query_input(self, input_data: dict) -> dict:
"""Validate query input"""
query = input_data.get("query", "")
if not query or not query.strip():
raise ValueError("Query cannot be empty")
# Check for potentially harmful content
harmful_patterns = ["<script>", "DROP TABLE", "DELETE FROM"]
if any(pattern in query.upper() for pattern in harmful_patterns):
raise ValueError("Query contains potentially harmful content")
return {"query": query.strip(), **{k: v for k, v in input_data.items() if k != "query"}}
# Pydantic model for structured validation
class TextAnalysisInput(BaseModel):
text: str
language: Optional[str] = "en"
analysis_type: str = "general"
max_length: Optional[int] = 5000
@validator('text')
def text_must_not_be_empty(cls, v):
if not v or not v.strip():
raise ValueError('Text cannot be empty')
return v.strip()
@validator('analysis_type')
def analysis_type_must_be_valid(cls, v):
valid_types = ['general', 'sentiment', 'summary', 'keywords']
if v not in valid_types:
raise ValueError(f'Analysis type must be one of: {valid_types}')
return v
@validator('text')
def text_length_check(cls, v, values):
max_len = values.get('max_length', 5000)
if len(v) > max_len:
raise ValueError(f'Text length exceeds maximum of {max_len} characters')
return v
def create_validated_chain(chain, validator_func):
"""Create chain with input validation"""
def validated_invoke(input_data: dict):
try:
# Validate input
validated_input = validator_func(input_data)
# Run chain with validated input
return chain.invoke(validated_input)
except ValueError as e:
return f"Validation error: {str(e)}"
except Exception as e:
return f"Execution error: {str(e)}"
return RunnableLambda(validated_invoke)
# Usage
validator = InputValidator()
# Create validated chain
base_chain = ChatPromptTemplate.from_template("Analyze this text: {text}") | llm | StrOutputParser()
validated_chain = create_validated_chain(base_chain, validator.validate_text_input)
# Test validation
valid_input = {"text": "This is a valid text input for analysis."}
invalid_input = {"text": ""}
print("Valid input result:", validated_chain.invoke(valid_input))
print("Invalid input result:", validated_chain.invoke(invalid_input))
# Pydantic validation
def pydantic_validator(input_data: dict) -> dict:
"""Pydantic-based validation"""
validated = TextAnalysisInput(**input_data)
return validated.dict()
pydantic_chain = create_validated_chain(base_chain, pydantic_validator)π― Real-World LCEL Applications β
π Document Processing Pipeline β
class DocumentProcessingPipeline:
def __init__(self, llm):
self.llm = llm
self.pipeline = self._build_pipeline()
def _build_pipeline(self):
"""Build comprehensive document processing pipeline"""
# Step 1: Document classification
classify_prompt = ChatPromptTemplate.from_template("""
Classify this document into one of these categories:
- technical (programming, engineering, scientific)
- business (reports, proposals, legal)
- creative (stories, articles, blogs)
- academic (research papers, thesis, studies)
Document: {text}
Return only the category name.
""")
# Step 2: Content analysis based on classification
technical_analysis = ChatPromptTemplate.from_template("""
Analyze this technical document and extract:
1. Key technologies mentioned
2. Technical concepts
3. Implementation details
4. Potential issues or recommendations
Document: {text}
""")
business_analysis = ChatPromptTemplate.from_template("""
Analyze this business document and extract:
1. Key objectives
2. Financial implications
3. Stakeholders mentioned
4. Action items or decisions
Document: {text}
""")
creative_analysis = ChatPromptTemplate.from_template("""
Analyze this creative document and extract:
1. Main themes
2. Writing style and tone
3. Target audience
4. Key messages or takeaways
Document: {text}
""")
academic_analysis = ChatPromptTemplate.from_template("""
Analyze this academic document and extract:
1. Research questions or hypotheses
2. Methodology mentioned
3. Key findings or arguments
4. Citations or references to other work
Document: {text}
""")
# Build conditional analysis chain
analysis_chain = RunnableBranch(
(lambda x: x["category"] == "technical", technical_analysis | self.llm | StrOutputParser()),
(lambda x: x["category"] == "business", business_analysis | self.llm | StrOutputParser()),
(lambda x: x["category"] == "creative", creative_analysis | self.llm | StrOutputParser()),
academic_analysis | self.llm | StrOutputParser() # Default for academic
)
# Step 3: Generate summary and keywords in parallel
summary_chain = ChatPromptTemplate.from_template(
"Create a concise summary of this document: {text}"
) | self.llm | StrOutputParser()
keywords_chain = ChatPromptTemplate.from_template(
"Extract 5-10 key terms and concepts from this document: {text}"
) | self.llm | StrOutputParser()
# Step 4: Combine all results
def combine_results(data: dict) -> dict:
return {
"document_type": data["category"],
"detailed_analysis": data["analysis"],
"summary": data["summary"],
"keywords": data["keywords"],
"processing_timestamp": datetime.now().isoformat()
}
# Build complete pipeline
pipeline = (
RunnablePassthrough.assign(
category=classify_prompt | self.llm | StrOutputParser()
) |
RunnablePassthrough.assign(
analysis=analysis_chain
) |
RunnableParallel(
category=lambda x: x["category"],
analysis=lambda x: x["analysis"],
summary=summary_chain,
keywords=keywords_chain
) |
RunnableLambda(combine_results)
)
return pipeline
def process_document(self, document_text: str) -> dict:
"""Process a document through the complete pipeline"""
return self.pipeline.invoke({"text": document_text})
# Usage
doc_processor = DocumentProcessingPipeline(llm)
sample_document = """
Machine learning models require careful preprocessing of training data to achieve optimal performance.
Feature engineering involves transforming raw data into meaningful representations that algorithms can effectively utilize.
Common preprocessing steps include normalization, handling missing values, and encoding categorical variables.
The choice of preprocessing techniques significantly impacts model accuracy and generalization capabilities.
"""
result = doc_processor.process_document(sample_document)
print("Processing result:", result)π€ Conversational AI with Context β
class ContextualConversationChain:
def __init__(self, llm):
self.llm = llm
self.conversation_history = []
self.chain = self._build_conversational_chain()
def _build_conversational_chain(self):
"""Build contextual conversation chain"""
# Context preparation
def prepare_context(input_data: dict) -> dict:
current_message = input_data["message"]
# Format conversation history
if self.conversation_history:
context = "Previous conversation:\n"
for i, (user_msg, ai_msg) in enumerate(self.conversation_history[-3:]): # Last 3 exchanges
context += f"User: {user_msg}\nAI: {ai_msg}\n"
context += f"\nCurrent message: {current_message}"
else:
context = f"Current message: {current_message}"
return {"context": context, "message": current_message}
# Intent classification
intent_prompt = ChatPromptTemplate.from_template("""
Classify the intent of this message:
- question (asking for information)
- request (asking for action)
- casual (casual conversation)
- follow_up (following up on previous conversation)
Context: {context}
Return only the intent category.
""")
# Response generation based on intent
question_prompt = ChatPromptTemplate.from_template("""
Answer this question based on the conversation context:
{context}
Provide a helpful, informative response.
""")
request_prompt = ChatPromptTemplate.from_template("""
Respond to this request based on the conversation context:
{context}
Explain what you can help with and provide guidance.
""")
casual_prompt = ChatPromptTemplate.from_template("""
Continue this casual conversation naturally:
{context}
Be friendly and engaging.
""")
follow_up_prompt = ChatPromptTemplate.from_template("""
This is a follow-up to previous conversation. Respond appropriately:
{context}
Reference previous context as needed and provide a coherent response.
""")
# Build conditional response chain
response_chain = RunnableBranch(
(lambda x: x["intent"] == "question", question_prompt | self.llm | StrOutputParser()),
(lambda x: x["intent"] == "request", request_prompt | self.llm | StrOutputParser()),
(lambda x: x["intent"] == "casual", casual_prompt | self.llm | StrOutputParser()),
follow_up_prompt | self.llm | StrOutputParser() # Default for follow_up
)
# Complete conversation chain
conversation_chain = (
RunnableLambda(prepare_context) |
RunnablePassthrough.assign(
intent=intent_prompt | self.llm | StrOutputParser()
) |
RunnablePassthrough.assign(
response=response_chain
)
)
return conversation_chain
def chat(self, message: str) -> str:
"""Have a conversation with context awareness"""
result = self.chain.invoke({"message": message})
# Store conversation history
self.conversation_history.append((message, result["response"]))
# Keep only recent history (last 10 exchanges)
if len(self.conversation_history) > 10:
self.conversation_history = self.conversation_history[-10:]
return result["response"]
def reset_conversation(self):
"""Reset conversation history"""
self.conversation_history = []
# Usage
conversational_ai = ContextualConversationChain(llm)
# Have a conversation
print("AI:", conversational_ai.chat("Hello! I'm interested in learning about machine learning."))
print("AI:", conversational_ai.chat("What's the difference between supervised and unsupervised learning?"))
print("AI:", conversational_ai.chat("Can you give me an example of supervised learning?"))
print("AI:", conversational_ai.chat("That's helpful! What about unsupervised learning examples?"))π Multi-Modal Analysis Pipeline β
class MultiModalAnalysisPipeline:
def __init__(self, llm):
self.llm = llm
self.pipeline = self._build_pipeline()
def _build_pipeline(self):
"""Build pipeline for analyzing different data types"""
# Data type detection
def detect_data_type(input_data: dict) -> dict:
data = input_data["data"]
if isinstance(data, str):
if data.startswith("http"):
data_type = "url"
elif "," in data and "\n" in data:
data_type = "csv"
else:
data_type = "text"
elif isinstance(data, dict):
data_type = "json"
elif isinstance(data, list):
data_type = "list"
else:
data_type = "unknown"
return {"data": data, "data_type": data_type}
# Text analysis
text_analysis_prompt = ChatPromptTemplate.from_template("""
Analyze this text content:
1. Main topics and themes
2. Sentiment and tone
3. Key insights
4. Potential applications
Text: {data}
""")
# URL analysis
url_analysis_prompt = ChatPromptTemplate.from_template("""
This appears to be a URL. Provide analysis on:
1. What type of content this might contain
2. Potential analysis approaches
3. Data extraction strategies
4. Relevance assessment
URL: {data}
""")
# CSV analysis
csv_analysis_prompt = ChatPromptTemplate.from_template("""
This appears to be CSV data. Provide analysis on:
1. Data structure and columns
2. Potential data quality issues
3. Analysis opportunities
4. Visualization suggestions
CSV Data: {data}
""")
# JSON analysis
json_analysis_prompt = ChatPromptTemplate.from_template("""
Analyze this JSON data structure:
1. Schema and data types
2. Hierarchical relationships
3. Potential insights
4. Processing recommendations
JSON: {data}
""")
# Build conditional analysis chain
analysis_chain = RunnableBranch(
(lambda x: x["data_type"] == "text", text_analysis_prompt | self.llm | StrOutputParser()),
(lambda x: x["data_type"] == "url", url_analysis_prompt | self.llm | StrOutputParser()),
(lambda x: x["data_type"] == "csv", csv_analysis_prompt | self.llm | StrOutputParser()),
(lambda x: x["data_type"] == "json", json_analysis_prompt | self.llm | StrOutputParser()),
ChatPromptTemplate.from_template("Data type not recognized: {data}") | self.llm | StrOutputParser()
)
# Metadata generation
def generate_metadata(data: dict) -> dict:
return {
**data,
"analysis_timestamp": datetime.now().isoformat(),
"data_size": len(str(data["data"])),
"processing_type": "multi_modal_analysis"
}
# Complete pipeline
pipeline = (
RunnableLambda(detect_data_type) |
RunnablePassthrough.assign(
analysis=analysis_chain
) |
RunnableLambda(generate_metadata)
)
return pipeline
def analyze(self, data: any) -> dict:
"""Analyze data of any supported type"""
return self.pipeline.invoke({"data": data})
# Usage
multimodal_analyzer = MultiModalAnalysisPipeline(llm)
# Test different data types
text_data = "Renewable energy adoption is accelerating globally with solar and wind leading the transition."
csv_data = "name,age,department\nJohn,30,Engineering\nJane,25,Marketing\nBob,35,Sales"
json_data = {"user": "alice", "actions": ["login", "view_dashboard", "generate_report"], "timestamp": "2024-01-15"}
url_data = "https://example.com/api/data/climate_trends"
# Analyze each data type
print("Text Analysis:", multimodal_analyzer.analyze(text_data))
print("\nCSV Analysis:", multimodal_analyzer.analyze(csv_data))
print("\nJSON Analysis:", multimodal_analyzer.analyze(json_data))
print("\nURL Analysis:", multimodal_analyzer.analyze(url_data))π Getting Started with LCEL β
π Simple LCEL Examples β
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.llms import OpenAI
# Initialize components
llm = OpenAI(temperature=0.7)
prompt = ChatPromptTemplate.from_template("Explain {concept} in simple terms")
parser = StrOutputParser()
# Basic chain
simple_chain = prompt | llm | parser
# Execute
result = simple_chain.invoke({"concept": "blockchain technology"})
print(result)
# Parallel processing
from langchain.schema.runnable import RunnableParallel
parallel_chain = RunnableParallel(
explanation=prompt | llm | parser,
summary=ChatPromptTemplate.from_template("Summarize {concept}") | llm | parser
)
parallel_result = parallel_chain.invoke({"concept": "artificial intelligence"})
print(parallel_result)π― Next Steps β
- Master Basic Operators: Practice with pipe, parallel, and conditional operators
- Build Complex Chains: Combine multiple operators for sophisticated workflows
- Implement Error Handling: Add robust error handling and validation
- Optimize Performance: Use async operations and streaming for better performance
- Create Custom Components: Build domain-specific LCEL components
π Additional Resources β
- LCEL Performance Guide: Optimization techniques and benchmarks
- Advanced Patterns: Complex workflow examples and best practices
- Debugging LCEL: Tools and techniques for troubleshooting chains
- Production Deployment: Scaling LCEL applications
Master LCEL to build sophisticated, maintainable, and high-performance LangChain applications with declarative workflow orchestration.