Advanced RAG - Sophisticated Retrieval Patterns β
Master advanced Retrieval-Augmented Generation techniques including multi-hop reasoning, adaptive retrieval, and production-scale RAG architectures
π Understanding Advanced RAG β
Advanced RAG goes beyond simple document retrieval to implement sophisticated reasoning patterns, adaptive strategies, and production-optimized architectures that can handle complex queries and large-scale knowledge bases.
π§ Evolution of RAG Systems β
text
π§ RAG SYSTEM EVOLUTION π§
(From simple to sophisticated)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BASIC RAG β
β (Single-step retrieval) β
β β
β Query β Retrieve β Generate β Response β
β β
β β
Simple implementation β
β β Limited reasoning capability β
β β No query refinement β
β β Single information source β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ ADVANCED PATTERNS
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ADVANCED RAG β
β (Multi-step, adaptive, optimized) β
β β
β Query β Analyze β Plan β Multi-Retrieve β Reason β Synthesize β
β β β β β β β β
β Context Route Strategy Sources Evidence Final β
β Aware Query Select Multiple Chain Answer β
β β
β β
Complex reasoning β
β β
Adaptive strategies β
β β
Multi-source synthesis β
β β
Production optimized β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββπ Multi-Hop RAG Patterns β
π― Sequential Reasoning RAG β
python
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import Dict, List, Any, Optional
import json
from datetime import datetime
class MultiHopRAG:
"""RAG system that performs multi-step reasoning"""
def __init__(self, vectorstore, llm, max_hops: int = 3):
self.vectorstore = vectorstore
self.llm = llm
self.max_hops = max_hops
self.reasoning_history = []
# Prompts for different reasoning steps
self.query_analyzer = self._create_query_analyzer()
self.sub_query_generator = self._create_sub_query_generator()
self.evidence_synthesizer = self._create_evidence_synthesizer()
self.final_answerer = self._create_final_answerer()
def _create_query_analyzer(self):
"""Create query analysis chain"""
prompt = ChatPromptTemplate.from_template("""
Analyze this query to determine if it requires multi-step reasoning:
Query: {query}
Determine:
1. Is this a simple factual question or complex reasoning question?
2. What are the key concepts that need to be researched?
3. What is the logical sequence of sub-questions needed?
4. How many reasoning steps are likely required?
Respond in JSON format:
{{
"complexity": "simple|moderate|complex",
"key_concepts": ["concept1", "concept2"],
"reasoning_steps": ["step1", "step2"],
"estimated_hops": 1-3
}}
""")
return prompt | self.llm | StrOutputParser()
def _create_sub_query_generator(self):
"""Create sub-query generation chain"""
prompt = ChatPromptTemplate.from_template("""
Based on the analysis and current context, generate the next sub-query:
Original Query: {original_query}
Analysis: {analysis}
Current Step: {current_step}
Previous Evidence: {previous_evidence}
Generate a specific sub-query that will help answer the original question.
The sub-query should be focused and retrievable from documents.
Sub-query:""")
return prompt | self.llm | StrOutputParser()
def _create_evidence_synthesizer(self):
"""Create evidence synthesis chain"""
prompt = ChatPromptTemplate.from_template("""
Synthesize the retrieved evidence to answer the sub-query:
Sub-query: {sub_query}
Retrieved Documents:
{documents}
Provide:
1. Direct answer to the sub-query
2. Key evidence supporting the answer
3. Confidence level (high/medium/low)
4. Whether additional information is needed
Synthesis:""")
return prompt | self.llm | StrOutputParser()
def _create_final_answerer(self):
"""Create final answer generation chain"""
prompt = ChatPromptTemplate.from_template("""
Generate a comprehensive answer using all gathered evidence:
Original Query: {original_query}
Evidence Chain:
{evidence_chain}
Instructions:
1. Provide a complete answer to the original query
2. Show logical reasoning steps
3. Cite evidence from multiple sources
4. Acknowledge any limitations or uncertainties
Final Answer:""")
return prompt | self.llm | StrOutputParser()
def query(self, query: str) -> Dict[str, Any]:
"""Perform multi-hop reasoning query"""
self.reasoning_history = []
start_time = datetime.now()
# Step 1: Analyze query complexity
analysis_result = self.query_analyzer.invoke({"query": query})
try:
analysis = json.loads(analysis_result)
except:
# Fallback for non-JSON responses
analysis = {
"complexity": "moderate",
"key_concepts": [query],
"reasoning_steps": ["basic_retrieval"],
"estimated_hops": 1
}
print(f"π Query Analysis: {analysis}")
# Step 2: Multi-hop reasoning
evidence_chain = []
current_query = query
for hop in range(min(analysis.get("estimated_hops", 1), self.max_hops)):
print(f"\nπ Reasoning Hop {hop + 1}/{self.max_hops}")
# Generate sub-query for current hop
if hop > 0:
sub_query = self.sub_query_generator.invoke({
"original_query": query,
"analysis": json.dumps(analysis),
"current_step": hop + 1,
"previous_evidence": json.dumps(evidence_chain)
})
else:
sub_query = query
print(f"Sub-query: {sub_query}")
# Retrieve documents for sub-query
retrieved_docs = self.vectorstore.similarity_search(sub_query, k=5)
if not retrieved_docs:
print(f"β οΈ No documents found for: {sub_query}")
break
# Format documents
docs_text = "\n\n".join([
f"Document {i+1}: {doc.page_content}"
for i, doc in enumerate(retrieved_docs)
])
# Synthesize evidence
evidence = self.evidence_synthesizer.invoke({
"sub_query": sub_query,
"documents": docs_text
})
evidence_entry = {
"hop": hop + 1,
"sub_query": sub_query,
"retrieved_docs": len(retrieved_docs),
"evidence": evidence,
"timestamp": datetime.now()
}
evidence_chain.append(evidence_entry)
self.reasoning_history.append(evidence_entry)
print(f"Evidence: {evidence[:200]}...")
# Check if we have enough information
if "no additional information" in evidence.lower() or hop >= self.max_hops - 1:
break
# Step 3: Generate final answer
evidence_chain_text = "\n\n".join([
f"Step {e['hop']}: {e['sub_query']}\nEvidence: {e['evidence']}"
for e in evidence_chain
])
final_answer = self.final_answerer.invoke({
"original_query": query,
"evidence_chain": evidence_chain_text
})
total_time = (datetime.now() - start_time).total_seconds()
return {
"query": query,
"analysis": analysis,
"reasoning_hops": len(evidence_chain),
"evidence_chain": evidence_chain,
"final_answer": final_answer,
"processing_time": total_time,
"total_documents_retrieved": sum(e["retrieved_docs"] for e in evidence_chain)
}
# Demo multi-hop RAG
def demo_multi_hop_rag():
"""Demonstrate multi-hop reasoning RAG"""
# Create sample knowledge base
sample_docs = [
"Machine learning is a subset of artificial intelligence that uses algorithms to learn patterns from data.",
"Neural networks are computing systems inspired by biological neural networks. They consist of layers of interconnected nodes.",
"Deep learning uses neural networks with multiple hidden layers to learn complex patterns.",
"Transformers are a type of neural network architecture that uses attention mechanisms for processing sequences.",
"GPT (Generative Pre-trained Transformer) models are built on the transformer architecture.",
"Large language models like GPT-3 and GPT-4 are trained on vast amounts of text data.",
"Transfer learning allows models pre-trained on one task to be adapted for related tasks.",
"Fine-tuning is a form of transfer learning where a pre-trained model is further trained on specific data.",
"Prompt engineering involves designing effective inputs to get desired outputs from language models.",
"Few-shot learning enables models to perform tasks with only a few examples.",
"BERT (Bidirectional Encoder Representations from Transformers) is designed for understanding context.",
"Attention mechanisms allow models to focus on relevant parts of the input when making predictions."
]
# Create vector store
from langchain_core.documents import Document
documents = [Document(page_content=doc) for doc in sample_docs]
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(documents, embeddings)
# Create multi-hop RAG system
llm = ChatOpenAI(temperature=0.1, model="gpt-3.5-turbo")
multi_hop_rag = MultiHopRAG(vectorstore, llm, max_hops=3)
print("π Multi-Hop RAG Demo:")
print("======================")
# Test complex questions requiring multi-step reasoning
complex_queries = [
"How does the architecture of transformers relate to the capabilities of large language models like GPT?",
"What is the connection between neural networks, deep learning, and modern AI applications?",
"How do attention mechanisms in transformers enable few-shot learning capabilities?"
]
for i, query in enumerate(complex_queries, 1):
print(f"\n--- Complex Query {i} ---")
print(f"Query: {query}")
result = multi_hop_rag.query(query)
print(f"\nπ Results Summary:")
print(f"Reasoning hops: {result['reasoning_hops']}")
print(f"Documents retrieved: {result['total_documents_retrieved']}")
print(f"Processing time: {result['processing_time']:.2f}s")
print(f"\nπ― Final Answer:")
print(result['final_answer'])
print("\n" + "="*60)
return multi_hop_rag
multi_hop_rag = demo_multi_hop_rag()π― Adaptive Retrieval Strategy β
python
class AdaptiveRAG:
"""RAG system that adapts retrieval strategy based on query type"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
# Different retrieval strategies
self.strategies = {
"factual": self._factual_strategy,
"analytical": self._analytical_strategy,
"comparative": self._comparative_strategy,
"synthetic": self._synthetic_strategy
}
self.query_classifier = self._create_query_classifier()
self.strategy_selector = self._create_strategy_selector()
def _create_query_classifier(self):
"""Create query classification chain"""
prompt = ChatPromptTemplate.from_template("""
Classify this query into one of these categories:
Query: {query}
Categories:
1. factual: Direct factual questions (What is X? When did Y happen?)
2. analytical: Questions requiring analysis (Why does X work? How does Y affect Z?)
3. comparative: Questions comparing entities (What's the difference between X and Y?)
4. synthetic: Questions requiring synthesis of multiple concepts
Consider:
- Question words (what, how, why, compare, etc.)
- Complexity of reasoning required
- Number of concepts involved
Respond with just the category name: factual, analytical, comparative, or synthetic
""")
return prompt | self.llm | StrOutputParser()
def _create_strategy_selector(self):
"""Create strategy selection chain"""
prompt = ChatPromptTemplate.from_template("""
Based on the query type, select the optimal retrieval strategy:
Query: {query}
Query Type: {query_type}
Available strategies:
1. factual: Single-step retrieval with exact matching
2. analytical: Multi-step retrieval with reasoning chains
3. comparative: Parallel retrieval of compared entities
4. synthetic: Broad retrieval with concept synthesis
Strategy parameters to consider:
- Number of retrieval steps
- Number of documents per step
- Similarity threshold
- Document diversity requirements
Provide strategy configuration in JSON:
{{
"strategy": "strategy_name",
"parameters": {{
"retrieval_steps": 1-3,
"docs_per_step": 3-8,
"similarity_threshold": 0.7-0.9,
"diversity_weight": 0.0-1.0
}}
}}
""")
return prompt | self.llm | StrOutputParser()
def _factual_strategy(self, query: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Simple factual retrieval strategy"""
docs = self.vectorstore.similarity_search(
query,
k=params.get("docs_per_step", 3)
)
return [{
"step": 1,
"query": query,
"documents": docs,
"strategy": "direct_similarity"
}]
def _analytical_strategy(self, query: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Multi-step analytical retrieval"""
results = []
# Step 1: Get basic information
basic_docs = self.vectorstore.similarity_search(query, k=3)
results.append({
"step": 1,
"query": query,
"documents": basic_docs,
"strategy": "basic_facts"
})
# Step 2: Get detailed analysis
analysis_query = f"detailed analysis explanation reasoning {query}"
analysis_docs = self.vectorstore.similarity_search(analysis_query, k=4)
results.append({
"step": 2,
"query": analysis_query,
"documents": analysis_docs,
"strategy": "detailed_analysis"
})
return results
def _comparative_strategy(self, query: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parallel comparative retrieval"""
results = []
# Extract entities to compare (simplified)
comparison_prompt = f"""
Extract the main entities being compared in this query: {query}
List them as: entity1, entity2, ...
"""
entities_result = self.llm.invoke(comparison_prompt).content
entities = [e.strip() for e in entities_result.split(',')]
# Retrieve for each entity
for i, entity in enumerate(entities[:3], 1): # Limit to 3 entities
entity_docs = self.vectorstore.similarity_search(entity, k=3)
results.append({
"step": i,
"query": entity,
"documents": entity_docs,
"strategy": f"entity_comparison_{entity}"
})
return results
def _synthetic_strategy(self, query: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Broad synthesis retrieval"""
results = []
# Step 1: Broad concept retrieval
broad_docs = self.vectorstore.similarity_search(query, k=6)
results.append({
"step": 1,
"query": query,
"documents": broad_docs,
"strategy": "broad_concepts"
})
# Step 2: Related concepts with MMR for diversity
if hasattr(self.vectorstore, 'max_marginal_relevance_search'):
diverse_docs = self.vectorstore.max_marginal_relevance_search(
query, k=4, fetch_k=10
)
results.append({
"step": 2,
"query": f"related concepts {query}",
"documents": diverse_docs,
"strategy": "diverse_related"
})
return results
def query(self, query: str) -> Dict[str, Any]:
"""Perform adaptive RAG query"""
start_time = datetime.now()
# Step 1: Classify query
query_type = self.query_classifier.invoke({"query": query}).strip().lower()
print(f"π·οΈ Query Type: {query_type}")
# Step 2: Select strategy
try:
strategy_config = self.strategy_selector.invoke({
"query": query,
"query_type": query_type
})
strategy_data = json.loads(strategy_config)
except:
# Fallback to query type mapping
strategy_data = {
"strategy": query_type if query_type in self.strategies else "factual",
"parameters": {
"retrieval_steps": 1,
"docs_per_step": 4,
"similarity_threshold": 0.8,
"diversity_weight": 0.3
}
}
print(f"π― Strategy: {strategy_data}")
# Step 3: Execute retrieval strategy
strategy_name = strategy_data["strategy"]
if strategy_name not in self.strategies:
strategy_name = "factual" # Fallback
retrieval_results = self.strategies[strategy_name](
query,
strategy_data.get("parameters", {})
)
# Step 4: Generate response
all_docs = []
for result in retrieval_results:
all_docs.extend(result["documents"])
# Remove duplicates
unique_docs = []
seen_content = set()
for doc in all_docs:
content_hash = hash(doc.page_content[:100])
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_docs.append(doc)
# Format context
context = "\n\n".join([
f"Source {i+1}: {doc.page_content}"
for i, doc in enumerate(unique_docs[:8])
])
# Generate answer
answer_prompt = ChatPromptTemplate.from_template("""
Answer the query using the provided context, adapting your response style to the query type.
Query Type: {query_type}
Query: {query}
Strategy Used: {strategy}
Context:
{context}
Instructions based on query type:
- factual: Provide direct, concise answers with citations
- analytical: Explain reasoning and mechanisms
- comparative: Highlight similarities and differences
- synthetic: Integrate multiple concepts coherently
Answer:""")
answer = (answer_prompt | self.llm | StrOutputParser()).invoke({
"query_type": query_type,
"query": query,
"strategy": strategy_data["strategy"],
"context": context
})
processing_time = (datetime.now() - start_time).total_seconds()
return {
"query": query,
"query_type": query_type,
"strategy": strategy_data,
"retrieval_results": retrieval_results,
"unique_documents": len(unique_docs),
"answer": answer,
"processing_time": processing_time
}
# Demo adaptive RAG
def demo_adaptive_rag():
"""Demonstrate adaptive RAG strategies"""
# Reuse the vectorstore from previous demo
sample_docs = [
"Machine learning algorithms learn patterns from data to make predictions or decisions.",
"Supervised learning uses labeled data to train models, while unsupervised learning finds patterns in unlabeled data.",
"Neural networks consist of interconnected nodes organized in layers that process information.",
"Deep learning uses neural networks with many layers to learn complex representations.",
"Convolutional Neural Networks (CNNs) are excellent for image processing and computer vision.",
"Recurrent Neural Networks (RNNs) are designed for sequential data like text and time series.",
"Transformers use attention mechanisms to process sequences more efficiently than RNNs.",
"BERT and GPT are transformer-based models with different architectures and training objectives.",
"Reinforcement learning trains agents to make decisions through trial and error with rewards.",
"Transfer learning adapts pre-trained models to new tasks with limited data.",
"Overfitting occurs when models memorize training data but fail to generalize to new data.",
"Regularization techniques help prevent overfitting and improve model generalization."
]
from langchain_core.documents import Document
documents = [Document(page_content=doc) for doc in sample_docs]
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(documents, embeddings)
# Create adaptive RAG system
llm = ChatOpenAI(temperature=0.1, model="gpt-3.5-turbo")
adaptive_rag = AdaptiveRAG(vectorstore, llm)
print("π― Adaptive RAG Demo:")
print("====================")
# Test different query types
test_queries = [
("What is machine learning?", "factual"),
("How do neural networks learn patterns from data?", "analytical"),
("What's the difference between CNNs and RNNs?", "comparative"),
("How do transformers, attention mechanisms, and transfer learning work together in modern AI?", "synthetic")
]
for i, (query, expected_type) in enumerate(test_queries, 1):
print(f"\n--- Query {i} ---")
print(f"Query: {query}")
print(f"Expected Type: {expected_type}")
result = adaptive_rag.query(query)
print(f"Detected Type: {result['query_type']}")
print(f"Strategy: {result['strategy']['strategy']}")
print(f"Documents: {result['unique_documents']}")
print(f"Processing Time: {result['processing_time']:.2f}s")
print(f"\nAnswer: {result['answer'][:300]}...")
print("\n" + "="*50)
return adaptive_rag
adaptive_rag = demo_adaptive_rag()π§ Production RAG Optimizations β
β‘ Hierarchical RAG Architecture β
python
from typing import Tuple, Union
import faiss
import numpy as np
class HierarchicalRAG:
"""Multi-level RAG system with hierarchical document organization"""
def __init__(self, documents: List[Document], embeddings):
self.embeddings = embeddings
self.llm = ChatOpenAI(temperature=0.1)
# Build hierarchical structure
self.document_hierarchy = self._build_hierarchy(documents)
self.level_retrievers = self._create_level_retrievers()
def _build_hierarchy(self, documents: List[Document]) -> Dict[str, Any]:
"""Build hierarchical document structure"""
# Level 1: Document summaries
summaries = []
for doc in documents:
summary_prompt = f"""
Create a concise summary of this document content:
{doc.page_content[:500]}...
Summary (1-2 sentences):"""
summary = self.llm.invoke(summary_prompt).content
summaries.append(Document(
page_content=summary,
metadata={**doc.metadata, "level": "summary", "original_doc_id": len(summaries)}
))
# Level 2: Concept clusters
clusters = self._create_concept_clusters(documents)
# Level 3: Original documents
full_docs = [
Document(
page_content=doc.page_content,
metadata={**doc.metadata, "level": "full", "doc_id": i}
)
for i, doc in enumerate(documents)
]
return {
"level_1_summaries": summaries,
"level_2_clusters": clusters,
"level_3_full": full_docs
}
def _create_concept_clusters(self, documents: List[Document]) -> List[Document]:
"""Create concept-based document clusters"""
# Simple clustering based on keyword overlap
clusters = {}
for i, doc in enumerate(documents):
# Extract key concepts (simplified)
concepts_prompt = f"""
Extract 3-5 key concepts from this text:
{doc.page_content[:300]}...
Concepts (comma-separated):"""
concepts_text = self.llm.invoke(concepts_prompt).content
concepts = [c.strip().lower() for c in concepts_text.split(',')]
# Assign to clusters
for concept in concepts:
if concept not in clusters:
clusters[concept] = []
clusters[concept].append(i)
# Create cluster documents
cluster_docs = []
for concept, doc_ids in clusters.items():
if len(doc_ids) > 1: # Only multi-document clusters
cluster_content = f"Concept: {concept}\nRelated documents: {len(doc_ids)}\n"
cluster_content += f"Document IDs: {doc_ids}"
cluster_docs.append(Document(
page_content=cluster_content,
metadata={"level": "cluster", "concept": concept, "doc_ids": doc_ids}
))
return cluster_docs
def _create_level_retrievers(self) -> Dict[str, Any]:
"""Create retrievers for each hierarchy level"""
retrievers = {}
# Level 1: Summary retriever
if self.document_hierarchy["level_1_summaries"]:
summary_vectorstore = Chroma.from_documents(
self.document_hierarchy["level_1_summaries"],
self.embeddings
)
retrievers["summary"] = summary_vectorstore.as_retriever(search_kwargs={"k": 5})
# Level 2: Cluster retriever
if self.document_hierarchy["level_2_clusters"]:
cluster_vectorstore = Chroma.from_documents(
self.document_hierarchy["level_2_clusters"],
self.embeddings
)
retrievers["cluster"] = cluster_vectorstore.as_retriever(search_kwargs={"k": 3})
# Level 3: Full document retriever
full_vectorstore = Chroma.from_documents(
self.document_hierarchy["level_3_full"],
self.embeddings
)
retrievers["full"] = full_vectorstore.as_retriever(search_kwargs={"k": 8})
return retrievers
def hierarchical_query(self, query: str) -> Dict[str, Any]:
"""Perform hierarchical retrieval"""
start_time = datetime.now()
results = {"query": query, "levels": {}}
# Level 1: Query summaries first
if "summary" in self.level_retrievers:
summary_docs = self.level_retrievers["summary"].get_relevant_documents(query)
results["levels"]["summary"] = {
"documents": summary_docs,
"count": len(summary_docs)
}
# Identify relevant full documents from summaries
relevant_doc_ids = []
for doc in summary_docs:
if "original_doc_id" in doc.metadata:
relevant_doc_ids.append(doc.metadata["original_doc_id"])
else:
relevant_doc_ids = list(range(len(self.document_hierarchy["level_3_full"])))
# Level 2: Query clusters for concept understanding
if "cluster" in self.level_retrievers:
cluster_docs = self.level_retrievers["cluster"].get_relevant_documents(query)
results["levels"]["cluster"] = {
"documents": cluster_docs,
"count": len(cluster_docs)
}
# Add documents from relevant clusters
for doc in cluster_docs:
if "doc_ids" in doc.metadata:
relevant_doc_ids.extend(doc.metadata["doc_ids"])
# Level 3: Retrieve full documents (filtered by hierarchy)
full_docs = self.level_retrievers["full"].get_relevant_documents(query)
# Filter to most relevant based on hierarchy
if relevant_doc_ids:
filtered_docs = []
for doc in full_docs:
if doc.metadata.get("doc_id") in relevant_doc_ids:
filtered_docs.append(doc)
# If filtering is too restrictive, include some direct matches
if len(filtered_docs) < 3:
filtered_docs.extend(full_docs[:3])
full_docs = filtered_docs[:6] # Limit to top 6
results["levels"]["full"] = {
"documents": full_docs,
"count": len(full_docs)
}
# Generate hierarchical answer
answer = self._generate_hierarchical_answer(query, results)
results["answer"] = answer
results["processing_time"] = (datetime.now() - start_time).total_seconds()
return results
def _generate_hierarchical_answer(self, query: str, retrieval_results: Dict[str, Any]) -> str:
"""Generate answer using hierarchical context"""
# Build hierarchical context
context_parts = []
# Add summary context
if "summary" in retrieval_results["levels"]:
summaries = retrieval_results["levels"]["summary"]["documents"]
if summaries:
context_parts.append("DOCUMENT SUMMARIES:")
for i, doc in enumerate(summaries):
context_parts.append(f"Summary {i+1}: {doc.page_content}")
# Add cluster context
if "cluster" in retrieval_results["levels"]:
clusters = retrieval_results["levels"]["cluster"]["documents"]
if clusters:
context_parts.append("\nRELATED CONCEPTS:")
for i, doc in enumerate(clusters):
context_parts.append(f"Concept {i+1}: {doc.page_content}")
# Add full document context
full_docs = retrieval_results["levels"]["full"]["documents"]
if full_docs:
context_parts.append("\nDETAILED INFORMATION:")
for i, doc in enumerate(full_docs):
context_parts.append(f"Document {i+1}: {doc.page_content[:300]}...")
context = "\n".join(context_parts)
# Generate answer
answer_prompt = ChatPromptTemplate.from_template("""
Answer the question using the hierarchical context provided.
Use information from summaries for overview, concepts for understanding, and detailed documents for specifics.
Question: {question}
Hierarchical Context:
{context}
Provide a comprehensive answer that integrates information from all levels:
""")
answer = (answer_prompt | self.llm | StrOutputParser()).invoke({
"question": query,
"context": context
})
return answer
# Demo hierarchical RAG
def demo_hierarchical_rag():
"""Demonstrate hierarchical RAG architecture"""
# Create more comprehensive document set
comprehensive_docs = [
"Artificial Intelligence (AI) is the simulation of human intelligence in machines. It includes machine learning, natural language processing, computer vision, and robotics.",
"Machine Learning is a subset of AI that enables computers to learn and improve from experience without being explicitly programmed. It includes supervised, unsupervised, and reinforcement learning.",
"Neural Networks are computing systems inspired by biological neural networks. They consist of layers of interconnected nodes that process information through weighted connections.",
"Deep Learning uses neural networks with multiple hidden layers to automatically learn hierarchical representations of data. It has revolutionized fields like image recognition and language processing.",
"Convolutional Neural Networks (CNNs) are specialized for processing grid-like data such as images. They use convolution operations to detect features like edges, shapes, and patterns.",
"Recurrent Neural Networks (RNNs) are designed for sequential data. They have memory capabilities that allow them to process sequences of varying lengths.",
"Transformers revolutionized natural language processing by using attention mechanisms instead of recurrence. They can process sequences in parallel, making them more efficient.",
"BERT (Bidirectional Encoder Representations from Transformers) is designed to understand context by looking at words from both directions in a sentence.",
"GPT (Generative Pre-trained Transformer) models are autoregressive language models that generate text by predicting the next word in a sequence.",
"Attention mechanisms allow models to focus on relevant parts of the input when making predictions. They are the key innovation in transformer architectures.",
"Transfer Learning involves taking a model trained on one task and adapting it to a related task. This approach has made AI more accessible and efficient.",
"Fine-tuning is a transfer learning technique where a pre-trained model is further trained on task-specific data to improve performance on that particular task."
]
from langchain_core.documents import Document
documents = [Document(page_content=doc) for doc in comprehensive_docs]
embeddings = OpenAIEmbeddings()
# Create hierarchical RAG system
hierarchical_rag = HierarchicalRAG(documents, embeddings)
print("ποΈ Hierarchical RAG Demo:")
print("=========================")
# Test hierarchical queries
test_queries = [
"What is the relationship between AI, machine learning, and deep learning?",
"How do transformers work and why are they important?",
"Compare different types of neural networks and their applications"
]
for i, query in enumerate(test_queries, 1):
print(f"\n--- Hierarchical Query {i} ---")
print(f"Query: {query}")
result = hierarchical_rag.hierarchical_query(query)
print(f"\nπ Retrieval Summary:")
for level, data in result["levels"].items():
print(f" {level.capitalize()}: {data['count']} documents")
print(f"Processing Time: {result['processing_time']:.2f}s")
print(f"\nπ― Hierarchical Answer:")
print(result["answer"][:400] + "...")
print("\n" + "="*60)
return hierarchical_rag
hierarchical_rag = demo_hierarchical_rag()π Feedback-Enhanced RAG β
python
class FeedbackEnhancedRAG:
"""RAG system that learns from user feedback"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
self.feedback_store = []
self.query_patterns = {}
self.relevance_model = self._init_relevance_model()
def _init_relevance_model(self):
"""Initialize simple relevance scoring model"""
return {
"positive_patterns": set(),
"negative_patterns": set(),
"query_adjustments": {}
}
def query_with_feedback(self, query: str, user_id: str = "default") -> Dict[str, Any]:
"""Perform RAG query with feedback-based optimization"""
# Step 1: Apply learned query adjustments
optimized_query = self._optimize_query(query)
# Step 2: Retrieve with relevance scoring
retrieved_docs = self._retrieve_with_relevance(optimized_query)
# Step 3: Generate answer
answer = self._generate_answer(optimized_query, retrieved_docs)
# Step 4: Create feedback session
session_id = f"{user_id}_{int(datetime.now().timestamp())}"
result = {
"session_id": session_id,
"original_query": query,
"optimized_query": optimized_query,
"retrieved_docs": retrieved_docs,
"answer": answer,
"feedback_prompt": self._create_feedback_prompt()
}
return result
def _optimize_query(self, query: str) -> str:
"""Optimize query based on past feedback"""
# Check for known query patterns
for pattern, adjustment in self.relevance_model["query_adjustments"].items():
if pattern.lower() in query.lower():
optimized = query + " " + adjustment
print(f"π§ Query optimized: {query} β {optimized}")
return optimized
return query
def _retrieve_with_relevance(self, query: str, k: int = 6) -> List[Tuple[Document, float]]:
"""Retrieve documents with relevance scoring"""
# Get base retrieval
docs = self.vectorstore.similarity_search_with_score(query, k=k*2)
# Apply feedback-based relevance adjustment
scored_docs = []
for doc, score in docs:
adjusted_score = self._adjust_relevance_score(doc, query, score)
scored_docs.append((doc, adjusted_score))
# Sort by adjusted score and return top k
scored_docs.sort(key=lambda x: x[1])
return scored_docs[:k]
def _adjust_relevance_score(self, doc: Document, query: str, base_score: float) -> float:
"""Adjust relevance score based on feedback"""
adjusted_score = base_score
# Check against positive patterns
doc_text = doc.page_content.lower()
query_text = query.lower()
for pattern in self.relevance_model["positive_patterns"]:
if pattern in doc_text or pattern in query_text:
adjusted_score *= 0.9 # Lower score is better in similarity search
# Check against negative patterns
for pattern in self.relevance_model["negative_patterns"]:
if pattern in doc_text:
adjusted_score *= 1.1 # Higher score is worse
return adjusted_score
def _generate_answer(self, query: str, retrieved_docs: List[Tuple[Document, float]]) -> str:
"""Generate answer from retrieved documents"""
context = "\n\n".join([
f"Source {i+1} (relevance: {1/score:.2f}): {doc.page_content}"
for i, (doc, score) in enumerate(retrieved_docs)
])
answer_prompt = ChatPromptTemplate.from_template("""
Answer the question using the provided context. Consider the relevance scores when weighing information.
Question: {question}
Context with Relevance Scores:
{context}
Answer:""")
answer = (answer_prompt | self.llm | StrOutputParser()).invoke({
"question": query,
"context": context
})
return answer
def _create_feedback_prompt(self) -> str:
"""Create feedback collection prompt"""
return """
Please rate this response:
1. Was the answer helpful? (yes/no)
2. Was the information relevant? (1-5 scale)
3. What information was missing?
4. Were there any irrelevant parts?
Your feedback helps improve future responses!
"""
def collect_feedback(
self,
session_id: str,
helpful: bool,
relevance_rating: int,
missing_info: str = "",
irrelevant_parts: str = ""
):
"""Collect and process user feedback"""
feedback = {
"session_id": session_id,
"timestamp": datetime.now(),
"helpful": helpful,
"relevance_rating": relevance_rating,
"missing_info": missing_info,
"irrelevant_parts": irrelevant_parts
}
self.feedback_store.append(feedback)
# Update relevance model
self._update_relevance_model(feedback)
print(f"β
Feedback collected for session {session_id}")
return feedback
def _update_relevance_model(self, feedback: Dict[str, Any]):
"""Update relevance model based on feedback"""
# Extract patterns from feedback
if feedback["helpful"] and feedback["relevance_rating"] >= 4:
# Positive feedback - extract positive patterns
if feedback["missing_info"]:
# Add missing info as query adjustment
session = self._get_session(feedback["session_id"])
if session:
original_query = session["original_query"]
adjustment = feedback["missing_info"]
self.relevance_model["query_adjustments"][original_query] = adjustment
elif not feedback["helpful"] or feedback["relevance_rating"] <= 2:
# Negative feedback - extract negative patterns
if feedback["irrelevant_parts"]:
negative_keywords = self._extract_keywords(feedback["irrelevant_parts"])
self.relevance_model["negative_patterns"].update(negative_keywords)
def _get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get session data by ID"""
# In practice, you'd store sessions in a database
# For demo, we'll store the last session
if hasattr(self, 'last_session') and self.last_session.get("session_id") == session_id:
return self.last_session
return None
def _extract_keywords(self, text: str) -> set:
"""Extract keywords from text"""
import re
words = re.findall(r'\b\w+\b', text.lower())
return {word for word in words if len(word) > 3}
def get_feedback_analytics(self) -> Dict[str, Any]:
"""Get analytics on collected feedback"""
if not self.feedback_store:
return {"message": "No feedback collected yet"}
total_feedback = len(self.feedback_store)
helpful_count = sum(1 for f in self.feedback_store if f["helpful"])
avg_relevance = sum(f["relevance_rating"] for f in self.feedback_store) / total_feedback
return {
"total_feedback": total_feedback,
"helpfulness_rate": helpful_count / total_feedback,
"average_relevance": avg_relevance,
"positive_patterns": len(self.relevance_model["positive_patterns"]),
"negative_patterns": len(self.relevance_model["negative_patterns"]),
"query_adjustments": len(self.relevance_model["query_adjustments"])
}
# Demo feedback-enhanced RAG
def demo_feedback_enhanced_rag():
"""Demonstrate RAG system with feedback learning"""
# Reuse vectorstore from previous examples
docs = [
"Python is a programming language known for its simplicity and readability.",
"Machine learning algorithms can be implemented efficiently in Python.",
"Data science workflows often use Python libraries like pandas and numpy.",
"Web development in Python commonly uses frameworks like Django and Flask.",
"Python's extensive library ecosystem makes it versatile for many applications.",
"Object-oriented programming in Python uses classes and inheritance.",
"Python supports functional programming paradigms as well.",
"Performance optimization in Python can involve using Cython or PyPy."
]
from langchain_core.documents import Document
documents = [Document(page_content=doc) for doc in docs]
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(documents, embeddings)
# Create feedback-enhanced RAG
llm = ChatOpenAI(temperature=0.1)
feedback_rag = FeedbackEnhancedRAG(vectorstore, llm)
print("π Feedback-Enhanced RAG Demo:")
print("=============================")
# Simulate interaction cycle
queries = [
"What is Python good for?",
"How can I optimize Python performance?",
"What are Python frameworks?"
]
for i, query in enumerate(queries, 1):
print(f"\n--- Query {i}: {query} ---")
# Perform query
result = feedback_rag.query_with_feedback(query, user_id="demo_user")
feedback_rag.last_session = result # Store for demo
print(f"Answer: {result['answer'][:200]}...")
print(f"\nSession ID: {result['session_id']}")
# Simulate user feedback
if i == 1:
# Positive feedback
feedback_rag.collect_feedback(
session_id=result["session_id"],
helpful=True,
relevance_rating=4,
missing_info="libraries and frameworks",
irrelevant_parts=""
)
elif i == 2:
# Negative feedback
feedback_rag.collect_feedback(
session_id=result["session_id"],
helpful=False,
relevance_rating=2,
missing_info="specific optimization techniques",
irrelevant_parts="general programming concepts"
)
else:
# Mixed feedback
feedback_rag.collect_feedback(
session_id=result["session_id"],
helpful=True,
relevance_rating=3,
missing_info="",
irrelevant_parts="object-oriented programming"
)
print("Feedback collected β
")
# Show feedback analytics
print(f"\nπ Feedback Analytics:")
analytics = feedback_rag.get_feedback_analytics()
print(json.dumps(analytics, indent=2))
# Test improved query (should show optimization)
print(f"\nπ Testing Improved Query:")
final_result = feedback_rag.query_with_feedback("What is Python good for?", user_id="demo_user")
print(f"Optimized Query: {final_result['optimized_query']}")
return feedback_rag
feedback_rag = demo_feedback_enhanced_rag()π Next Steps β
Ready to deploy production RAG systems? Continue with:
- Production Patterns - Scale and deploy RAG systems
- Security and Privacy - Secure RAG implementations
- Testing and Evaluation - Validate RAG performance
Key Advanced RAG Takeaways:
- Multi-hop reasoning enables complex question answering
- Adaptive strategies optimize retrieval for different query types
- Hierarchical architectures improve efficiency and relevance
- Feedback learning continuously improves system performance
- Production optimization requires careful architecture design
- User experience improves with intelligent retrieval patterns
- Monitoring and iteration are essential for RAG system success