Skip to content

Advanced RAG - Sophisticated Retrieval Patterns ​

Master advanced Retrieval-Augmented Generation techniques including multi-hop reasoning, adaptive retrieval, and production-scale RAG architectures

πŸš€ Understanding Advanced RAG ​

Advanced RAG goes beyond simple document retrieval to implement sophisticated reasoning patterns, adaptive strategies, and production-optimized architectures that can handle complex queries and large-scale knowledge bases.

🧠 Evolution of RAG Systems ​

text
                    🧠 RAG SYSTEM EVOLUTION 🧠
                   (From simple to sophisticated)

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                      BASIC RAG                                 β”‚
    β”‚                   (Single-step retrieval)                      β”‚
    β”‚                                                                β”‚
    β”‚  Query β†’ Retrieve β†’ Generate β†’ Response                        β”‚
    β”‚                                                                β”‚
    β”‚  βœ… Simple implementation                                      β”‚
    β”‚  ❌ Limited reasoning capability                               β”‚
    β”‚  ❌ No query refinement                                        β”‚
    β”‚  ❌ Single information source                                  β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό ADVANCED PATTERNS
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                    ADVANCED RAG                                β”‚
    β”‚              (Multi-step, adaptive, optimized)                 β”‚
    β”‚                                                                β”‚
    β”‚  Query β†’ Analyze β†’ Plan β†’ Multi-Retrieve β†’ Reason β†’ Synthesize β”‚
    β”‚     ↓       ↓        ↓          ↓           ↓         ↓       β”‚
    β”‚  Context  Route   Strategy   Sources     Evidence   Final     β”‚
    β”‚  Aware    Query   Select     Multiple    Chain     Answer    β”‚
    β”‚                                                                β”‚
    β”‚  βœ… Complex reasoning                                          β”‚
    β”‚  βœ… Adaptive strategies                                        β”‚
    β”‚  βœ… Multi-source synthesis                                     β”‚
    β”‚  βœ… Production optimized                                       β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ”„ Multi-Hop RAG Patterns ​

🎯 Sequential Reasoning RAG ​

python
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import Dict, List, Any, Optional
import json
from datetime import datetime

class MultiHopRAG:
    """RAG system that performs multi-step reasoning"""
    
    def __init__(self, vectorstore, llm, max_hops: int = 3):
        self.vectorstore = vectorstore
        self.llm = llm
        self.max_hops = max_hops
        self.reasoning_history = []
        
        # Prompts for different reasoning steps
        self.query_analyzer = self._create_query_analyzer()
        self.sub_query_generator = self._create_sub_query_generator()
        self.evidence_synthesizer = self._create_evidence_synthesizer()
        self.final_answerer = self._create_final_answerer()
    
    def _create_query_analyzer(self):
        """Create query analysis chain"""
        prompt = ChatPromptTemplate.from_template("""
        Analyze this query to determine if it requires multi-step reasoning:
        
        Query: {query}
        
        Determine:
        1. Is this a simple factual question or complex reasoning question?
        2. What are the key concepts that need to be researched?
        3. What is the logical sequence of sub-questions needed?
        4. How many reasoning steps are likely required?
        
        Respond in JSON format:
        {{
            "complexity": "simple|moderate|complex",
            "key_concepts": ["concept1", "concept2"],
            "reasoning_steps": ["step1", "step2"],
            "estimated_hops": 1-3
        }}
        """)
        
        return prompt | self.llm | StrOutputParser()
    
    def _create_sub_query_generator(self):
        """Create sub-query generation chain"""
        prompt = ChatPromptTemplate.from_template("""
        Based on the analysis and current context, generate the next sub-query:
        
        Original Query: {original_query}
        Analysis: {analysis}
        Current Step: {current_step}
        Previous Evidence: {previous_evidence}
        
        Generate a specific sub-query that will help answer the original question.
        The sub-query should be focused and retrievable from documents.
        
        Sub-query:""")
        
        return prompt | self.llm | StrOutputParser()
    
    def _create_evidence_synthesizer(self):
        """Create evidence synthesis chain"""
        prompt = ChatPromptTemplate.from_template("""
        Synthesize the retrieved evidence to answer the sub-query:
        
        Sub-query: {sub_query}
        Retrieved Documents:
        {documents}
        
        Provide:
        1. Direct answer to the sub-query
        2. Key evidence supporting the answer
        3. Confidence level (high/medium/low)
        4. Whether additional information is needed
        
        Synthesis:""")
        
        return prompt | self.llm | StrOutputParser()
    
    def _create_final_answerer(self):
        """Create final answer generation chain"""
        prompt = ChatPromptTemplate.from_template("""
        Generate a comprehensive answer using all gathered evidence:
        
        Original Query: {original_query}
        
        Evidence Chain:
        {evidence_chain}
        
        Instructions:
        1. Provide a complete answer to the original query
        2. Show logical reasoning steps
        3. Cite evidence from multiple sources
        4. Acknowledge any limitations or uncertainties
        
        Final Answer:""")
        
        return prompt | self.llm | StrOutputParser()
    
    def query(self, query: str) -> Dict[str, Any]:
        """Perform multi-hop reasoning query"""
        
        self.reasoning_history = []
        start_time = datetime.now()
        
        # Step 1: Analyze query complexity
        analysis_result = self.query_analyzer.invoke({"query": query})
        
        try:
            analysis = json.loads(analysis_result)
        except:
            # Fallback for non-JSON responses
            analysis = {
                "complexity": "moderate",
                "key_concepts": [query],
                "reasoning_steps": ["basic_retrieval"],
                "estimated_hops": 1
            }
        
        print(f"πŸ“Š Query Analysis: {analysis}")
        
        # Step 2: Multi-hop reasoning
        evidence_chain = []
        current_query = query
        
        for hop in range(min(analysis.get("estimated_hops", 1), self.max_hops)):
            print(f"\nπŸ” Reasoning Hop {hop + 1}/{self.max_hops}")
            
            # Generate sub-query for current hop
            if hop > 0:
                sub_query = self.sub_query_generator.invoke({
                    "original_query": query,
                    "analysis": json.dumps(analysis),
                    "current_step": hop + 1,
                    "previous_evidence": json.dumps(evidence_chain)
                })
            else:
                sub_query = query
            
            print(f"Sub-query: {sub_query}")
            
            # Retrieve documents for sub-query
            retrieved_docs = self.vectorstore.similarity_search(sub_query, k=5)
            
            if not retrieved_docs:
                print(f"⚠️ No documents found for: {sub_query}")
                break
            
            # Format documents
            docs_text = "\n\n".join([
                f"Document {i+1}: {doc.page_content}"
                for i, doc in enumerate(retrieved_docs)
            ])
            
            # Synthesize evidence
            evidence = self.evidence_synthesizer.invoke({
                "sub_query": sub_query,
                "documents": docs_text
            })
            
            evidence_entry = {
                "hop": hop + 1,
                "sub_query": sub_query,
                "retrieved_docs": len(retrieved_docs),
                "evidence": evidence,
                "timestamp": datetime.now()
            }
            
            evidence_chain.append(evidence_entry)
            self.reasoning_history.append(evidence_entry)
            
            print(f"Evidence: {evidence[:200]}...")
            
            # Check if we have enough information
            if "no additional information" in evidence.lower() or hop >= self.max_hops - 1:
                break
        
        # Step 3: Generate final answer
        evidence_chain_text = "\n\n".join([
            f"Step {e['hop']}: {e['sub_query']}\nEvidence: {e['evidence']}"
            for e in evidence_chain
        ])
        
        final_answer = self.final_answerer.invoke({
            "original_query": query,
            "evidence_chain": evidence_chain_text
        })
        
        total_time = (datetime.now() - start_time).total_seconds()
        
        return {
            "query": query,
            "analysis": analysis,
            "reasoning_hops": len(evidence_chain),
            "evidence_chain": evidence_chain,
            "final_answer": final_answer,
            "processing_time": total_time,
            "total_documents_retrieved": sum(e["retrieved_docs"] for e in evidence_chain)
        }

# Demo multi-hop RAG
def demo_multi_hop_rag():
    """Demonstrate multi-hop reasoning RAG"""
    
    # Create sample knowledge base
    sample_docs = [
        "Machine learning is a subset of artificial intelligence that uses algorithms to learn patterns from data.",
        "Neural networks are computing systems inspired by biological neural networks. They consist of layers of interconnected nodes.",
        "Deep learning uses neural networks with multiple hidden layers to learn complex patterns.",
        "Transformers are a type of neural network architecture that uses attention mechanisms for processing sequences.",
        "GPT (Generative Pre-trained Transformer) models are built on the transformer architecture.",
        "Large language models like GPT-3 and GPT-4 are trained on vast amounts of text data.",
        "Transfer learning allows models pre-trained on one task to be adapted for related tasks.",
        "Fine-tuning is a form of transfer learning where a pre-trained model is further trained on specific data.",
        "Prompt engineering involves designing effective inputs to get desired outputs from language models.",
        "Few-shot learning enables models to perform tasks with only a few examples.",
        "BERT (Bidirectional Encoder Representations from Transformers) is designed for understanding context.",
        "Attention mechanisms allow models to focus on relevant parts of the input when making predictions."
    ]
    
    # Create vector store
    from langchain_core.documents import Document
    
    documents = [Document(page_content=doc) for doc in sample_docs]
    
    embeddings = OpenAIEmbeddings()
    vectorstore = Chroma.from_documents(documents, embeddings)
    
    # Create multi-hop RAG system
    llm = ChatOpenAI(temperature=0.1, model="gpt-3.5-turbo")
    multi_hop_rag = MultiHopRAG(vectorstore, llm, max_hops=3)
    
    print("πŸ”„ Multi-Hop RAG Demo:")
    print("======================")
    
    # Test complex questions requiring multi-step reasoning
    complex_queries = [
        "How does the architecture of transformers relate to the capabilities of large language models like GPT?",
        "What is the connection between neural networks, deep learning, and modern AI applications?",
        "How do attention mechanisms in transformers enable few-shot learning capabilities?"
    ]
    
    for i, query in enumerate(complex_queries, 1):
        print(f"\n--- Complex Query {i} ---")
        print(f"Query: {query}")
        
        result = multi_hop_rag.query(query)
        
        print(f"\nπŸ“Š Results Summary:")
        print(f"Reasoning hops: {result['reasoning_hops']}")
        print(f"Documents retrieved: {result['total_documents_retrieved']}")
        print(f"Processing time: {result['processing_time']:.2f}s")
        
        print(f"\n🎯 Final Answer:")
        print(result['final_answer'])
        
        print("\n" + "="*60)
    
    return multi_hop_rag

multi_hop_rag = demo_multi_hop_rag()

🎯 Adaptive Retrieval Strategy ​

python
class AdaptiveRAG:
    """RAG system that adapts retrieval strategy based on query type"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        
        # Different retrieval strategies
        self.strategies = {
            "factual": self._factual_strategy,
            "analytical": self._analytical_strategy,
            "comparative": self._comparative_strategy,
            "synthetic": self._synthetic_strategy
        }
        
        self.query_classifier = self._create_query_classifier()
        self.strategy_selector = self._create_strategy_selector()
    
    def _create_query_classifier(self):
        """Create query classification chain"""
        prompt = ChatPromptTemplate.from_template("""
        Classify this query into one of these categories:
        
        Query: {query}
        
        Categories:
        1. factual: Direct factual questions (What is X? When did Y happen?)
        2. analytical: Questions requiring analysis (Why does X work? How does Y affect Z?)
        3. comparative: Questions comparing entities (What's the difference between X and Y?)
        4. synthetic: Questions requiring synthesis of multiple concepts
        
        Consider:
        - Question words (what, how, why, compare, etc.)
        - Complexity of reasoning required
        - Number of concepts involved
        
        Respond with just the category name: factual, analytical, comparative, or synthetic
        """)
        
        return prompt | self.llm | StrOutputParser()
    
    def _create_strategy_selector(self):
        """Create strategy selection chain"""
        prompt = ChatPromptTemplate.from_template("""
        Based on the query type, select the optimal retrieval strategy:
        
        Query: {query}
        Query Type: {query_type}
        
        Available strategies:
        1. factual: Single-step retrieval with exact matching
        2. analytical: Multi-step retrieval with reasoning chains
        3. comparative: Parallel retrieval of compared entities
        4. synthetic: Broad retrieval with concept synthesis
        
        Strategy parameters to consider:
        - Number of retrieval steps
        - Number of documents per step
        - Similarity threshold
        - Document diversity requirements
        
        Provide strategy configuration in JSON:
        {{
            "strategy": "strategy_name",
            "parameters": {{
                "retrieval_steps": 1-3,
                "docs_per_step": 3-8,
                "similarity_threshold": 0.7-0.9,
                "diversity_weight": 0.0-1.0
            }}
        }}
        """)
        
        return prompt | self.llm | StrOutputParser()
    
    def _factual_strategy(self, query: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Simple factual retrieval strategy"""
        docs = self.vectorstore.similarity_search(
            query, 
            k=params.get("docs_per_step", 3)
        )
        
        return [{
            "step": 1,
            "query": query,
            "documents": docs,
            "strategy": "direct_similarity"
        }]
    
    def _analytical_strategy(self, query: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Multi-step analytical retrieval"""
        results = []
        
        # Step 1: Get basic information
        basic_docs = self.vectorstore.similarity_search(query, k=3)
        results.append({
            "step": 1,
            "query": query,
            "documents": basic_docs,
            "strategy": "basic_facts"
        })
        
        # Step 2: Get detailed analysis
        analysis_query = f"detailed analysis explanation reasoning {query}"
        analysis_docs = self.vectorstore.similarity_search(analysis_query, k=4)
        results.append({
            "step": 2,
            "query": analysis_query,
            "documents": analysis_docs,
            "strategy": "detailed_analysis"
        })
        
        return results
    
    def _comparative_strategy(self, query: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Parallel comparative retrieval"""
        results = []
        
        # Extract entities to compare (simplified)
        comparison_prompt = f"""
        Extract the main entities being compared in this query: {query}
        List them as: entity1, entity2, ...
        """
        
        entities_result = self.llm.invoke(comparison_prompt).content
        entities = [e.strip() for e in entities_result.split(',')]
        
        # Retrieve for each entity
        for i, entity in enumerate(entities[:3], 1):  # Limit to 3 entities
            entity_docs = self.vectorstore.similarity_search(entity, k=3)
            results.append({
                "step": i,
                "query": entity,
                "documents": entity_docs,
                "strategy": f"entity_comparison_{entity}"
            })
        
        return results
    
    def _synthetic_strategy(self, query: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Broad synthesis retrieval"""
        results = []
        
        # Step 1: Broad concept retrieval
        broad_docs = self.vectorstore.similarity_search(query, k=6)
        results.append({
            "step": 1,
            "query": query,
            "documents": broad_docs,
            "strategy": "broad_concepts"
        })
        
        # Step 2: Related concepts with MMR for diversity
        if hasattr(self.vectorstore, 'max_marginal_relevance_search'):
            diverse_docs = self.vectorstore.max_marginal_relevance_search(
                query, k=4, fetch_k=10
            )
            results.append({
                "step": 2,
                "query": f"related concepts {query}",
                "documents": diverse_docs,
                "strategy": "diverse_related"
            })
        
        return results
    
    def query(self, query: str) -> Dict[str, Any]:
        """Perform adaptive RAG query"""
        start_time = datetime.now()
        
        # Step 1: Classify query
        query_type = self.query_classifier.invoke({"query": query}).strip().lower()
        print(f"🏷️ Query Type: {query_type}")
        
        # Step 2: Select strategy
        try:
            strategy_config = self.strategy_selector.invoke({
                "query": query,
                "query_type": query_type
            })
            strategy_data = json.loads(strategy_config)
        except:
            # Fallback to query type mapping
            strategy_data = {
                "strategy": query_type if query_type in self.strategies else "factual",
                "parameters": {
                    "retrieval_steps": 1,
                    "docs_per_step": 4,
                    "similarity_threshold": 0.8,
                    "diversity_weight": 0.3
                }
            }
        
        print(f"🎯 Strategy: {strategy_data}")
        
        # Step 3: Execute retrieval strategy
        strategy_name = strategy_data["strategy"]
        if strategy_name not in self.strategies:
            strategy_name = "factual"  # Fallback
        
        retrieval_results = self.strategies[strategy_name](
            query, 
            strategy_data.get("parameters", {})
        )
        
        # Step 4: Generate response
        all_docs = []
        for result in retrieval_results:
            all_docs.extend(result["documents"])
        
        # Remove duplicates
        unique_docs = []
        seen_content = set()
        for doc in all_docs:
            content_hash = hash(doc.page_content[:100])
            if content_hash not in seen_content:
                seen_content.add(content_hash)
                unique_docs.append(doc)
        
        # Format context
        context = "\n\n".join([
            f"Source {i+1}: {doc.page_content}"
            for i, doc in enumerate(unique_docs[:8])
        ])
        
        # Generate answer
        answer_prompt = ChatPromptTemplate.from_template("""
        Answer the query using the provided context, adapting your response style to the query type.
        
        Query Type: {query_type}
        Query: {query}
        Strategy Used: {strategy}
        
        Context:
        {context}
        
        Instructions based on query type:
        - factual: Provide direct, concise answers with citations
        - analytical: Explain reasoning and mechanisms
        - comparative: Highlight similarities and differences
        - synthetic: Integrate multiple concepts coherently
        
        Answer:""")
        
        answer = (answer_prompt | self.llm | StrOutputParser()).invoke({
            "query_type": query_type,
            "query": query,
            "strategy": strategy_data["strategy"],
            "context": context
        })
        
        processing_time = (datetime.now() - start_time).total_seconds()
        
        return {
            "query": query,
            "query_type": query_type,
            "strategy": strategy_data,
            "retrieval_results": retrieval_results,
            "unique_documents": len(unique_docs),
            "answer": answer,
            "processing_time": processing_time
        }

# Demo adaptive RAG
def demo_adaptive_rag():
    """Demonstrate adaptive RAG strategies"""
    
    # Reuse the vectorstore from previous demo
    sample_docs = [
        "Machine learning algorithms learn patterns from data to make predictions or decisions.",
        "Supervised learning uses labeled data to train models, while unsupervised learning finds patterns in unlabeled data.",
        "Neural networks consist of interconnected nodes organized in layers that process information.",
        "Deep learning uses neural networks with many layers to learn complex representations.",
        "Convolutional Neural Networks (CNNs) are excellent for image processing and computer vision.",
        "Recurrent Neural Networks (RNNs) are designed for sequential data like text and time series.",
        "Transformers use attention mechanisms to process sequences more efficiently than RNNs.",
        "BERT and GPT are transformer-based models with different architectures and training objectives.",
        "Reinforcement learning trains agents to make decisions through trial and error with rewards.",
        "Transfer learning adapts pre-trained models to new tasks with limited data.",
        "Overfitting occurs when models memorize training data but fail to generalize to new data.",
        "Regularization techniques help prevent overfitting and improve model generalization."
    ]
    
    from langchain_core.documents import Document
    
    documents = [Document(page_content=doc) for doc in sample_docs]
    embeddings = OpenAIEmbeddings()
    vectorstore = Chroma.from_documents(documents, embeddings)
    
    # Create adaptive RAG system
    llm = ChatOpenAI(temperature=0.1, model="gpt-3.5-turbo")
    adaptive_rag = AdaptiveRAG(vectorstore, llm)
    
    print("🎯 Adaptive RAG Demo:")
    print("====================")
    
    # Test different query types
    test_queries = [
        ("What is machine learning?", "factual"),
        ("How do neural networks learn patterns from data?", "analytical"),
        ("What's the difference between CNNs and RNNs?", "comparative"),
        ("How do transformers, attention mechanisms, and transfer learning work together in modern AI?", "synthetic")
    ]
    
    for i, (query, expected_type) in enumerate(test_queries, 1):
        print(f"\n--- Query {i} ---")
        print(f"Query: {query}")
        print(f"Expected Type: {expected_type}")
        
        result = adaptive_rag.query(query)
        
        print(f"Detected Type: {result['query_type']}")
        print(f"Strategy: {result['strategy']['strategy']}")
        print(f"Documents: {result['unique_documents']}")
        print(f"Processing Time: {result['processing_time']:.2f}s")
        
        print(f"\nAnswer: {result['answer'][:300]}...")
        
        print("\n" + "="*50)
    
    return adaptive_rag

adaptive_rag = demo_adaptive_rag()

πŸ”§ Production RAG Optimizations ​

⚑ Hierarchical RAG Architecture ​

python
from typing import Tuple, Union
import faiss
import numpy as np

class HierarchicalRAG:
    """Multi-level RAG system with hierarchical document organization"""
    
    def __init__(self, documents: List[Document], embeddings):
        self.embeddings = embeddings
        self.llm = ChatOpenAI(temperature=0.1)
        
        # Build hierarchical structure
        self.document_hierarchy = self._build_hierarchy(documents)
        self.level_retrievers = self._create_level_retrievers()
        
    def _build_hierarchy(self, documents: List[Document]) -> Dict[str, Any]:
        """Build hierarchical document structure"""
        
        # Level 1: Document summaries
        summaries = []
        for doc in documents:
            summary_prompt = f"""
            Create a concise summary of this document content:
            
            {doc.page_content[:500]}...
            
            Summary (1-2 sentences):"""
            
            summary = self.llm.invoke(summary_prompt).content
            summaries.append(Document(
                page_content=summary,
                metadata={**doc.metadata, "level": "summary", "original_doc_id": len(summaries)}
            ))
        
        # Level 2: Concept clusters
        clusters = self._create_concept_clusters(documents)
        
        # Level 3: Original documents
        full_docs = [
            Document(
                page_content=doc.page_content,
                metadata={**doc.metadata, "level": "full", "doc_id": i}
            )
            for i, doc in enumerate(documents)
        ]
        
        return {
            "level_1_summaries": summaries,
            "level_2_clusters": clusters,
            "level_3_full": full_docs
        }
    
    def _create_concept_clusters(self, documents: List[Document]) -> List[Document]:
        """Create concept-based document clusters"""
        
        # Simple clustering based on keyword overlap
        clusters = {}
        
        for i, doc in enumerate(documents):
            # Extract key concepts (simplified)
            concepts_prompt = f"""
            Extract 3-5 key concepts from this text:
            
            {doc.page_content[:300]}...
            
            Concepts (comma-separated):"""
            
            concepts_text = self.llm.invoke(concepts_prompt).content
            concepts = [c.strip().lower() for c in concepts_text.split(',')]
            
            # Assign to clusters
            for concept in concepts:
                if concept not in clusters:
                    clusters[concept] = []
                clusters[concept].append(i)
        
        # Create cluster documents
        cluster_docs = []
        for concept, doc_ids in clusters.items():
            if len(doc_ids) > 1:  # Only multi-document clusters
                cluster_content = f"Concept: {concept}\nRelated documents: {len(doc_ids)}\n"
                cluster_content += f"Document IDs: {doc_ids}"
                
                cluster_docs.append(Document(
                    page_content=cluster_content,
                    metadata={"level": "cluster", "concept": concept, "doc_ids": doc_ids}
                ))
        
        return cluster_docs
    
    def _create_level_retrievers(self) -> Dict[str, Any]:
        """Create retrievers for each hierarchy level"""
        retrievers = {}
        
        # Level 1: Summary retriever
        if self.document_hierarchy["level_1_summaries"]:
            summary_vectorstore = Chroma.from_documents(
                self.document_hierarchy["level_1_summaries"],
                self.embeddings
            )
            retrievers["summary"] = summary_vectorstore.as_retriever(search_kwargs={"k": 5})
        
        # Level 2: Cluster retriever
        if self.document_hierarchy["level_2_clusters"]:
            cluster_vectorstore = Chroma.from_documents(
                self.document_hierarchy["level_2_clusters"],
                self.embeddings
            )
            retrievers["cluster"] = cluster_vectorstore.as_retriever(search_kwargs={"k": 3})
        
        # Level 3: Full document retriever
        full_vectorstore = Chroma.from_documents(
            self.document_hierarchy["level_3_full"],
            self.embeddings
        )
        retrievers["full"] = full_vectorstore.as_retriever(search_kwargs={"k": 8})
        
        return retrievers
    
    def hierarchical_query(self, query: str) -> Dict[str, Any]:
        """Perform hierarchical retrieval"""
        start_time = datetime.now()
        results = {"query": query, "levels": {}}
        
        # Level 1: Query summaries first
        if "summary" in self.level_retrievers:
            summary_docs = self.level_retrievers["summary"].get_relevant_documents(query)
            results["levels"]["summary"] = {
                "documents": summary_docs,
                "count": len(summary_docs)
            }
            
            # Identify relevant full documents from summaries
            relevant_doc_ids = []
            for doc in summary_docs:
                if "original_doc_id" in doc.metadata:
                    relevant_doc_ids.append(doc.metadata["original_doc_id"])
        else:
            relevant_doc_ids = list(range(len(self.document_hierarchy["level_3_full"])))
        
        # Level 2: Query clusters for concept understanding
        if "cluster" in self.level_retrievers:
            cluster_docs = self.level_retrievers["cluster"].get_relevant_documents(query)
            results["levels"]["cluster"] = {
                "documents": cluster_docs,
                "count": len(cluster_docs)
            }
            
            # Add documents from relevant clusters
            for doc in cluster_docs:
                if "doc_ids" in doc.metadata:
                    relevant_doc_ids.extend(doc.metadata["doc_ids"])
        
        # Level 3: Retrieve full documents (filtered by hierarchy)
        full_docs = self.level_retrievers["full"].get_relevant_documents(query)
        
        # Filter to most relevant based on hierarchy
        if relevant_doc_ids:
            filtered_docs = []
            for doc in full_docs:
                if doc.metadata.get("doc_id") in relevant_doc_ids:
                    filtered_docs.append(doc)
            
            # If filtering is too restrictive, include some direct matches
            if len(filtered_docs) < 3:
                filtered_docs.extend(full_docs[:3])
            
            full_docs = filtered_docs[:6]  # Limit to top 6
        
        results["levels"]["full"] = {
            "documents": full_docs,
            "count": len(full_docs)
        }
        
        # Generate hierarchical answer
        answer = self._generate_hierarchical_answer(query, results)
        results["answer"] = answer
        results["processing_time"] = (datetime.now() - start_time).total_seconds()
        
        return results
    
    def _generate_hierarchical_answer(self, query: str, retrieval_results: Dict[str, Any]) -> str:
        """Generate answer using hierarchical context"""
        
        # Build hierarchical context
        context_parts = []
        
        # Add summary context
        if "summary" in retrieval_results["levels"]:
            summaries = retrieval_results["levels"]["summary"]["documents"]
            if summaries:
                context_parts.append("DOCUMENT SUMMARIES:")
                for i, doc in enumerate(summaries):
                    context_parts.append(f"Summary {i+1}: {doc.page_content}")
        
        # Add cluster context
        if "cluster" in retrieval_results["levels"]:
            clusters = retrieval_results["levels"]["cluster"]["documents"]
            if clusters:
                context_parts.append("\nRELATED CONCEPTS:")
                for i, doc in enumerate(clusters):
                    context_parts.append(f"Concept {i+1}: {doc.page_content}")
        
        # Add full document context
        full_docs = retrieval_results["levels"]["full"]["documents"]
        if full_docs:
            context_parts.append("\nDETAILED INFORMATION:")
            for i, doc in enumerate(full_docs):
                context_parts.append(f"Document {i+1}: {doc.page_content[:300]}...")
        
        context = "\n".join(context_parts)
        
        # Generate answer
        answer_prompt = ChatPromptTemplate.from_template("""
        Answer the question using the hierarchical context provided.
        Use information from summaries for overview, concepts for understanding, and detailed documents for specifics.
        
        Question: {question}
        
        Hierarchical Context:
        {context}
        
        Provide a comprehensive answer that integrates information from all levels:
        """)
        
        answer = (answer_prompt | self.llm | StrOutputParser()).invoke({
            "question": query,
            "context": context
        })
        
        return answer

# Demo hierarchical RAG
def demo_hierarchical_rag():
    """Demonstrate hierarchical RAG architecture"""
    
    # Create more comprehensive document set
    comprehensive_docs = [
        "Artificial Intelligence (AI) is the simulation of human intelligence in machines. It includes machine learning, natural language processing, computer vision, and robotics.",
        "Machine Learning is a subset of AI that enables computers to learn and improve from experience without being explicitly programmed. It includes supervised, unsupervised, and reinforcement learning.",
        "Neural Networks are computing systems inspired by biological neural networks. They consist of layers of interconnected nodes that process information through weighted connections.",
        "Deep Learning uses neural networks with multiple hidden layers to automatically learn hierarchical representations of data. It has revolutionized fields like image recognition and language processing.",
        "Convolutional Neural Networks (CNNs) are specialized for processing grid-like data such as images. They use convolution operations to detect features like edges, shapes, and patterns.",
        "Recurrent Neural Networks (RNNs) are designed for sequential data. They have memory capabilities that allow them to process sequences of varying lengths.",
        "Transformers revolutionized natural language processing by using attention mechanisms instead of recurrence. They can process sequences in parallel, making them more efficient.",
        "BERT (Bidirectional Encoder Representations from Transformers) is designed to understand context by looking at words from both directions in a sentence.",
        "GPT (Generative Pre-trained Transformer) models are autoregressive language models that generate text by predicting the next word in a sequence.",
        "Attention mechanisms allow models to focus on relevant parts of the input when making predictions. They are the key innovation in transformer architectures.",
        "Transfer Learning involves taking a model trained on one task and adapting it to a related task. This approach has made AI more accessible and efficient.",
        "Fine-tuning is a transfer learning technique where a pre-trained model is further trained on task-specific data to improve performance on that particular task."
    ]
    
    from langchain_core.documents import Document
    
    documents = [Document(page_content=doc) for doc in comprehensive_docs]
    embeddings = OpenAIEmbeddings()
    
    # Create hierarchical RAG system
    hierarchical_rag = HierarchicalRAG(documents, embeddings)
    
    print("πŸ—οΈ Hierarchical RAG Demo:")
    print("=========================")
    
    # Test hierarchical queries
    test_queries = [
        "What is the relationship between AI, machine learning, and deep learning?",
        "How do transformers work and why are they important?",
        "Compare different types of neural networks and their applications"
    ]
    
    for i, query in enumerate(test_queries, 1):
        print(f"\n--- Hierarchical Query {i} ---")
        print(f"Query: {query}")
        
        result = hierarchical_rag.hierarchical_query(query)
        
        print(f"\nπŸ“Š Retrieval Summary:")
        for level, data in result["levels"].items():
            print(f"  {level.capitalize()}: {data['count']} documents")
        
        print(f"Processing Time: {result['processing_time']:.2f}s")
        
        print(f"\n🎯 Hierarchical Answer:")
        print(result["answer"][:400] + "...")
        
        print("\n" + "="*60)
    
    return hierarchical_rag

hierarchical_rag = demo_hierarchical_rag()

πŸ”„ Feedback-Enhanced RAG ​

python
class FeedbackEnhancedRAG:
    """RAG system that learns from user feedback"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        self.feedback_store = []
        self.query_patterns = {}
        self.relevance_model = self._init_relevance_model()
    
    def _init_relevance_model(self):
        """Initialize simple relevance scoring model"""
        return {
            "positive_patterns": set(),
            "negative_patterns": set(),
            "query_adjustments": {}
        }
    
    def query_with_feedback(self, query: str, user_id: str = "default") -> Dict[str, Any]:
        """Perform RAG query with feedback-based optimization"""
        
        # Step 1: Apply learned query adjustments
        optimized_query = self._optimize_query(query)
        
        # Step 2: Retrieve with relevance scoring
        retrieved_docs = self._retrieve_with_relevance(optimized_query)
        
        # Step 3: Generate answer
        answer = self._generate_answer(optimized_query, retrieved_docs)
        
        # Step 4: Create feedback session
        session_id = f"{user_id}_{int(datetime.now().timestamp())}"
        
        result = {
            "session_id": session_id,
            "original_query": query,
            "optimized_query": optimized_query,
            "retrieved_docs": retrieved_docs,
            "answer": answer,
            "feedback_prompt": self._create_feedback_prompt()
        }
        
        return result
    
    def _optimize_query(self, query: str) -> str:
        """Optimize query based on past feedback"""
        
        # Check for known query patterns
        for pattern, adjustment in self.relevance_model["query_adjustments"].items():
            if pattern.lower() in query.lower():
                optimized = query + " " + adjustment
                print(f"πŸ”§ Query optimized: {query} β†’ {optimized}")
                return optimized
        
        return query
    
    def _retrieve_with_relevance(self, query: str, k: int = 6) -> List[Tuple[Document, float]]:
        """Retrieve documents with relevance scoring"""
        
        # Get base retrieval
        docs = self.vectorstore.similarity_search_with_score(query, k=k*2)
        
        # Apply feedback-based relevance adjustment
        scored_docs = []
        for doc, score in docs:
            adjusted_score = self._adjust_relevance_score(doc, query, score)
            scored_docs.append((doc, adjusted_score))
        
        # Sort by adjusted score and return top k
        scored_docs.sort(key=lambda x: x[1])
        return scored_docs[:k]
    
    def _adjust_relevance_score(self, doc: Document, query: str, base_score: float) -> float:
        """Adjust relevance score based on feedback"""
        
        adjusted_score = base_score
        
        # Check against positive patterns
        doc_text = doc.page_content.lower()
        query_text = query.lower()
        
        for pattern in self.relevance_model["positive_patterns"]:
            if pattern in doc_text or pattern in query_text:
                adjusted_score *= 0.9  # Lower score is better in similarity search
        
        # Check against negative patterns
        for pattern in self.relevance_model["negative_patterns"]:
            if pattern in doc_text:
                adjusted_score *= 1.1  # Higher score is worse
        
        return adjusted_score
    
    def _generate_answer(self, query: str, retrieved_docs: List[Tuple[Document, float]]) -> str:
        """Generate answer from retrieved documents"""
        
        context = "\n\n".join([
            f"Source {i+1} (relevance: {1/score:.2f}): {doc.page_content}"
            for i, (doc, score) in enumerate(retrieved_docs)
        ])
        
        answer_prompt = ChatPromptTemplate.from_template("""
        Answer the question using the provided context. Consider the relevance scores when weighing information.
        
        Question: {question}
        
        Context with Relevance Scores:
        {context}
        
        Answer:""")
        
        answer = (answer_prompt | self.llm | StrOutputParser()).invoke({
            "question": query,
            "context": context
        })
        
        return answer
    
    def _create_feedback_prompt(self) -> str:
        """Create feedback collection prompt"""
        return """
        Please rate this response:
        1. Was the answer helpful? (yes/no)
        2. Was the information relevant? (1-5 scale)
        3. What information was missing?
        4. Were there any irrelevant parts?
        
        Your feedback helps improve future responses!
        """
    
    def collect_feedback(
        self, 
        session_id: str, 
        helpful: bool, 
        relevance_rating: int, 
        missing_info: str = "", 
        irrelevant_parts: str = ""
    ):
        """Collect and process user feedback"""
        
        feedback = {
            "session_id": session_id,
            "timestamp": datetime.now(),
            "helpful": helpful,
            "relevance_rating": relevance_rating,
            "missing_info": missing_info,
            "irrelevant_parts": irrelevant_parts
        }
        
        self.feedback_store.append(feedback)
        
        # Update relevance model
        self._update_relevance_model(feedback)
        
        print(f"βœ… Feedback collected for session {session_id}")
        return feedback
    
    def _update_relevance_model(self, feedback: Dict[str, Any]):
        """Update relevance model based on feedback"""
        
        # Extract patterns from feedback
        if feedback["helpful"] and feedback["relevance_rating"] >= 4:
            # Positive feedback - extract positive patterns
            if feedback["missing_info"]:
                # Add missing info as query adjustment
                session = self._get_session(feedback["session_id"])
                if session:
                    original_query = session["original_query"]
                    adjustment = feedback["missing_info"]
                    self.relevance_model["query_adjustments"][original_query] = adjustment
        
        elif not feedback["helpful"] or feedback["relevance_rating"] <= 2:
            # Negative feedback - extract negative patterns
            if feedback["irrelevant_parts"]:
                negative_keywords = self._extract_keywords(feedback["irrelevant_parts"])
                self.relevance_model["negative_patterns"].update(negative_keywords)
    
    def _get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        """Get session data by ID"""
        # In practice, you'd store sessions in a database
        # For demo, we'll store the last session
        if hasattr(self, 'last_session') and self.last_session.get("session_id") == session_id:
            return self.last_session
        return None
    
    def _extract_keywords(self, text: str) -> set:
        """Extract keywords from text"""
        import re
        words = re.findall(r'\b\w+\b', text.lower())
        return {word for word in words if len(word) > 3}
    
    def get_feedback_analytics(self) -> Dict[str, Any]:
        """Get analytics on collected feedback"""
        
        if not self.feedback_store:
            return {"message": "No feedback collected yet"}
        
        total_feedback = len(self.feedback_store)
        helpful_count = sum(1 for f in self.feedback_store if f["helpful"])
        avg_relevance = sum(f["relevance_rating"] for f in self.feedback_store) / total_feedback
        
        return {
            "total_feedback": total_feedback,
            "helpfulness_rate": helpful_count / total_feedback,
            "average_relevance": avg_relevance,
            "positive_patterns": len(self.relevance_model["positive_patterns"]),
            "negative_patterns": len(self.relevance_model["negative_patterns"]),
            "query_adjustments": len(self.relevance_model["query_adjustments"])
        }

# Demo feedback-enhanced RAG
def demo_feedback_enhanced_rag():
    """Demonstrate RAG system with feedback learning"""
    
    # Reuse vectorstore from previous examples
    docs = [
        "Python is a programming language known for its simplicity and readability.",
        "Machine learning algorithms can be implemented efficiently in Python.",
        "Data science workflows often use Python libraries like pandas and numpy.",
        "Web development in Python commonly uses frameworks like Django and Flask.",
        "Python's extensive library ecosystem makes it versatile for many applications.",
        "Object-oriented programming in Python uses classes and inheritance.",
        "Python supports functional programming paradigms as well.",
        "Performance optimization in Python can involve using Cython or PyPy."
    ]
    
    from langchain_core.documents import Document
    
    documents = [Document(page_content=doc) for doc in docs]
    embeddings = OpenAIEmbeddings()
    vectorstore = Chroma.from_documents(documents, embeddings)
    
    # Create feedback-enhanced RAG
    llm = ChatOpenAI(temperature=0.1)
    feedback_rag = FeedbackEnhancedRAG(vectorstore, llm)
    
    print("πŸ”„ Feedback-Enhanced RAG Demo:")
    print("=============================")
    
    # Simulate interaction cycle
    queries = [
        "What is Python good for?",
        "How can I optimize Python performance?",
        "What are Python frameworks?"
    ]
    
    for i, query in enumerate(queries, 1):
        print(f"\n--- Query {i}: {query} ---")
        
        # Perform query
        result = feedback_rag.query_with_feedback(query, user_id="demo_user")
        feedback_rag.last_session = result  # Store for demo
        
        print(f"Answer: {result['answer'][:200]}...")
        print(f"\nSession ID: {result['session_id']}")
        
        # Simulate user feedback
        if i == 1:
            # Positive feedback
            feedback_rag.collect_feedback(
                session_id=result["session_id"],
                helpful=True,
                relevance_rating=4,
                missing_info="libraries and frameworks",
                irrelevant_parts=""
            )
        elif i == 2:
            # Negative feedback
            feedback_rag.collect_feedback(
                session_id=result["session_id"],
                helpful=False,
                relevance_rating=2,
                missing_info="specific optimization techniques",
                irrelevant_parts="general programming concepts"
            )
        else:
            # Mixed feedback
            feedback_rag.collect_feedback(
                session_id=result["session_id"],
                helpful=True,
                relevance_rating=3,
                missing_info="",
                irrelevant_parts="object-oriented programming"
            )
        
        print("Feedback collected βœ…")
    
    # Show feedback analytics
    print(f"\nπŸ“Š Feedback Analytics:")
    analytics = feedback_rag.get_feedback_analytics()
    print(json.dumps(analytics, indent=2))
    
    # Test improved query (should show optimization)
    print(f"\nπŸ”„ Testing Improved Query:")
    final_result = feedback_rag.query_with_feedback("What is Python good for?", user_id="demo_user")
    print(f"Optimized Query: {final_result['optimized_query']}")
    
    return feedback_rag

feedback_rag = demo_feedback_enhanced_rag()

πŸ”— Next Steps ​

Ready to deploy production RAG systems? Continue with:


Key Advanced RAG Takeaways:

  • Multi-hop reasoning enables complex question answering
  • Adaptive strategies optimize retrieval for different query types
  • Hierarchical architectures improve efficiency and relevance
  • Feedback learning continuously improves system performance
  • Production optimization requires careful architecture design
  • User experience improves with intelligent retrieval patterns
  • Monitoring and iteration are essential for RAG system success

Released under the MIT License.