Skip to content

Retrieval Basics - Building Knowledge-Aware AI Systems ​

Learn to integrate external knowledge sources with LLMs through document retrieval, vector search, and RAG (Retrieval-Augmented Generation) patterns

πŸ” Understanding Information Retrieval ​

Information retrieval in LangChain enables AI systems to access and utilize external knowledge sources, making responses more accurate, current, and contextually relevant than relying solely on pre-trained model knowledge.

🧠 The Knowledge Problem ​

text
                    🧠 THE KNOWLEDGE LIMITATION PROBLEM 🧠
                        (Why retrieval matters)

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                     LLM LIMITATIONS                            β”‚
    β”‚                                                                β”‚
    β”‚  πŸ“… Training Cutoff: Knowledge frozen at training time         β”‚
    β”‚  🧠 Hallucination: May generate plausible but false info       β”‚
    β”‚  πŸ“Š Context Size: Limited input context window                 β”‚
    β”‚  🎯 Domain Gaps: Missing specialized knowledge                 β”‚
    β”‚  πŸ”„ Static Memory: Can't learn from new information            β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό RETRIEVAL SOLUTION
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                  RETRIEVAL-AUGMENTED GENERATION                β”‚
    β”‚                                                                β”‚
    β”‚  πŸ“š Dynamic Knowledge: Access current information               β”‚
    β”‚  🎯 Accurate Sources: Ground responses in real documents       β”‚
    β”‚  πŸ“ˆ Scalable Context: Process large knowledge bases            β”‚
    β”‚  πŸ”„ Updatable: Add new information without retraining          β”‚
    β”‚  βœ… Verifiable: Cite sources for transparency                  β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ—οΈ Core Retrieval Components ​

πŸ“„ Document Loaders ​

python
from langchain_community.document_loaders import (
    TextLoader, PyPDFLoader, WebBaseLoader, 
    DirectoryLoader, CSVLoader, JSONLoader
)
from langchain_core.documents import Document

# Text file loading
text_loader = TextLoader("data/documents/guide.txt")
text_docs = text_loader.load()

print(f"Loaded {len(text_docs)} documents")
print(f"First doc content: {text_docs[0].page_content[:200]}...")
print(f"Metadata: {text_docs[0].metadata}")

# PDF loading with metadata
pdf_loader = PyPDFLoader("data/reports/annual_report.pdf")
pdf_docs = pdf_loader.load()

for i, doc in enumerate(pdf_docs[:3]):
    print(f"Page {i+1}: {doc.page_content[:100]}...")
    print(f"Page metadata: {doc.metadata}")

# Web scraping
web_loader = WebBaseLoader([
    "https://python.langchain.com/docs/introduction/",
    "https://python.langchain.com/docs/get_started/quickstart"
])
web_docs = web_loader.load()

print(f"Loaded {len(web_docs)} web pages")

# Directory loading with filtering
directory_loader = DirectoryLoader(
    "data/knowledge_base/",
    glob="**/*.md",  # Only markdown files
    loader_cls=TextLoader,
    show_progress=True
)
directory_docs = directory_loader.load()

# CSV loading with custom processing
csv_loader = CSVLoader(
    file_path="data/faq.csv",
    csv_args={
        'delimiter': ',',
        'quotechar': '"',
        'fieldnames': ['question', 'answer', 'category']
    }
)
csv_docs = csv_loader.load()

# JSON loading with custom parsing
json_loader = JSONLoader(
    file_path="data/products.json",
    jq_schema='.products[]',  # Extract products array
    text_content=False
)
json_docs = json_loader.load()

# Custom document creation
def create_custom_documents():
    """Create documents from custom data sources"""
    custom_docs = []
    
    # Example: Convert API responses to documents
    api_data = [
        {"title": "Python Basics", "content": "Python is...", "author": "John"},
        {"title": "AI Fundamentals", "content": "AI involves...", "author": "Jane"}
    ]
    
    for item in api_data:
        doc = Document(
            page_content=item["content"],
            metadata={
                "title": item["title"],
                "author": item["author"],
                "source": "api",
                "timestamp": "2024-01-01"
            }
        )
        custom_docs.append(doc)
    
    return custom_docs

custom_docs = create_custom_documents()
print(f"Created {len(custom_docs)} custom documents")

βœ‚οΈ Text Splitters ​

python
from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter,
    SpacyTextSplitter
)

# Recursive character splitter (most common)
recursive_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,        # Target chunk size
    chunk_overlap=200,      # Overlap between chunks
    length_function=len,    # How to measure length
    separators=["\n\n", "\n", " ", ""]  # Split hierarchy
)

# Split documents
sample_text = """
This is a long document that needs to be split into smaller chunks for processing.
It contains multiple paragraphs and sections.

Each section covers different topics that should ideally stay together when possible.
The splitter will try to maintain semantic coherence while respecting size limits.

This helps maintain context while making the text manageable for embedding and retrieval.
"""

chunks = recursive_splitter.split_text(sample_text)
print(f"Split into {len(chunks)} chunks:")
for i, chunk in enumerate(chunks):
    print(f"Chunk {i+1}: {chunk[:100]}...")

# Split loaded documents
split_docs = recursive_splitter.split_documents(text_docs)
print(f"Original docs: {len(text_docs)}, Split docs: {len(split_docs)}")

# Token-based splitter (for token limits)
token_splitter = TokenTextSplitter(
    chunk_size=500,     # 500 tokens per chunk
    chunk_overlap=50    # 50 token overlap
)

token_chunks = token_splitter.split_text(sample_text)
print(f"Token-based chunks: {len(token_chunks)}")

# Code-aware splitter
from langchain.text_splitter import PythonCodeTextSplitter

code_text = '''
def fibonacci(n):
    """Calculate fibonacci number"""
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

class MathUtils:
    """Utility class for math operations"""
    
    @staticmethod
    def factorial(n):
        if n <= 1:
            return 1
        return n * MathUtils.factorial(n-1)
    
    def gcd(self, a, b):
        while b:
            a, b = b, a % b
        return a
'''

python_splitter = PythonCodeTextSplitter(chunk_size=200, chunk_overlap=0)
code_chunks = python_splitter.split_text(code_text)

print(f"Code chunks: {len(code_chunks)}")
for i, chunk in enumerate(code_chunks):
    print(f"Code chunk {i+1}:\n{chunk}\n---")

# Custom semantic splitter
class SemanticTextSplitter:
    """Split text based on semantic boundaries"""
    
    def __init__(self, chunk_size=1000, overlap=100):
        self.chunk_size = chunk_size
        self.overlap = overlap
    
    def split_text(self, text: str) -> list[str]:
        """Split text at sentence boundaries when possible"""
        import re
        
        # Split into sentences
        sentences = re.split(r'[.!?]+', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            # Check if adding sentence exceeds chunk size
            if len(current_chunk + sentence) > self.chunk_size and current_chunk:
                chunks.append(current_chunk.strip())
                
                # Start new chunk with overlap
                overlap_text = current_chunk[-self.overlap:] if len(current_chunk) > self.overlap else current_chunk
                current_chunk = overlap_text + " " + sentence
            else:
                current_chunk += " " + sentence if current_chunk else sentence
        
        # Add final chunk
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks

semantic_splitter = SemanticTextSplitter(chunk_size=200, overlap=50)
semantic_chunks = semantic_splitter.split_text(sample_text)
print(f"Semantic chunks: {len(semantic_chunks)}")

πŸ” Vector Stores and Embeddings ​

python
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, FAISS
from langchain_core.vectorstores import VectorStoreRetriever
import numpy as np

# Initialize embeddings model
embeddings = OpenAIEmbeddings(
    model="text-embedding-ada-002",
    chunk_size=1000  # Process embeddings in batches
)

# Create vector store from documents
def create_vector_store(documents, store_type="chroma"):
    """Create and populate vector store"""
    
    if store_type == "chroma":
        # Persistent Chroma store
        vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=embeddings,
            persist_directory="./chroma_db"
        )
    
    elif store_type == "faiss":
        # In-memory FAISS store (faster for smaller datasets)
        vectorstore = FAISS.from_documents(
            documents=documents,
            embedding=embeddings
        )
    
    return vectorstore

# Create vector store
vectorstore = create_vector_store(split_docs, store_type="chroma")

# Basic similarity search
query = "What are the key features of Python?"
similar_docs = vectorstore.similarity_search(query, k=4)

print(f"Found {len(similar_docs)} similar documents:")
for i, doc in enumerate(similar_docs):
    print(f"Doc {i+1}: {doc.page_content[:200]}...")
    print(f"Metadata: {doc.metadata}")
    print("---")

# Similarity search with scores
docs_with_scores = vectorstore.similarity_search_with_score(query, k=3)

print("Documents with similarity scores:")
for doc, score in docs_with_scores:
    print(f"Score: {score:.4f}")
    print(f"Content: {doc.page_content[:150]}...")
    print("---")

# Maximum Marginal Relevance (MMR) search
# Balances relevance with diversity
mmr_docs = vectorstore.max_marginal_relevance_search(
    query=query,
    k=4,
    fetch_k=10,        # Fetch more candidates
    lambda_mult=0.7    # Balance relevance vs diversity
)

print(f"MMR search found {len(mmr_docs)} diverse documents")

# Custom similarity threshold
def search_with_threshold(vectorstore, query, threshold=0.8, max_results=10):
    """Search with minimum similarity threshold"""
    docs_with_scores = vectorstore.similarity_search_with_score(
        query, k=max_results
    )
    
    # Filter by threshold
    filtered_docs = [
        (doc, score) for doc, score in docs_with_scores 
        if score <= threshold  # Lower scores = higher similarity in some stores
    ]
    
    return filtered_docs

threshold_results = search_with_threshold(vectorstore, query, threshold=0.5)
print(f"Results above threshold: {len(threshold_results)}")

🎯 Retrievers ​

python
from langchain_core.retrievers import BaseRetriever
from langchain.retrievers import (
    MultiQueryRetriever,
    EnsembleRetriever,
    ContextualCompressionRetriever
)
from langchain.retrievers.document_compressors import LLMChainExtractor

# Basic vector store retriever
basic_retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

# Test basic retrieval
retrieved_docs = basic_retriever.get_relevant_documents(query)
print(f"Basic retrieval: {len(retrieved_docs)} documents")

# Multi-query retriever (generates multiple query variations)
multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=basic_retriever,
    llm=ChatOpenAI(temperature=0)
)

multi_query_docs = multi_query_retriever.get_relevant_documents(
    "How to handle errors in Python?"
)
print(f"Multi-query retrieval: {len(multi_query_docs)} documents")

# Ensemble retriever (combines multiple retrieval methods)
# Create a keyword-based retriever
from langchain.retrievers import BM25Retriever

# Extract text content for BM25
doc_texts = [doc.page_content for doc in split_docs]
bm25_retriever = BM25Retriever.from_texts(doc_texts)
bm25_retriever.k = 3

# Combine vector and keyword search
ensemble_retriever = EnsembleRetriever(
    retrievers=[basic_retriever, bm25_retriever],
    weights=[0.7, 0.3]  # Favor vector search
)

ensemble_docs = ensemble_retriever.get_relevant_documents(query)
print(f"Ensemble retrieval: {len(ensemble_docs)} documents")

# Contextual compression retriever
compressor = LLMChainExtractor.from_llm(ChatOpenAI(temperature=0))

compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=basic_retriever
)

compressed_docs = compression_retriever.get_relevant_documents(
    "Explain Python data structures"
)

print("Compressed retrieval results:")
for doc in compressed_docs:
    print(f"Compressed: {doc.page_content}")
    print("---")

# Custom hybrid retriever
class HybridRetriever(BaseRetriever):
    """Custom retriever combining multiple strategies"""
    
    def __init__(self, vector_retriever, keyword_retriever, rerank_model=None):
        self.vector_retriever = vector_retriever
        self.keyword_retriever = keyword_retriever
        self.rerank_model = rerank_model
    
    def get_relevant_documents(self, query: str) -> list:
        """Retrieve and rerank documents"""
        
        # Get results from both retrievers
        vector_docs = self.vector_retriever.get_relevant_documents(query)
        keyword_docs = self.keyword_retriever.get_relevant_documents(query)
        
        # Combine and deduplicate
        all_docs = vector_docs + keyword_docs
        unique_docs = self._deduplicate_documents(all_docs)
        
        # Optional reranking
        if self.rerank_model:
            return self._rerank_documents(unique_docs, query)
        
        return unique_docs[:10]  # Return top 10
    
    def _deduplicate_documents(self, docs):
        """Remove duplicate documents based on content similarity"""
        seen_content = set()
        unique_docs = []
        
        for doc in docs:
            # Simple deduplication based on first 100 characters
            content_hash = hash(doc.page_content[:100])
            if content_hash not in seen_content:
                seen_content.add(content_hash)
                unique_docs.append(doc)
        
        return unique_docs
    
    def _rerank_documents(self, docs, query):
        """Rerank documents based on relevance"""
        # Placeholder for reranking logic
        # In practice, you might use a cross-encoder model
        return docs

hybrid_retriever = HybridRetriever(basic_retriever, bm25_retriever)
hybrid_docs = hybrid_retriever.get_relevant_documents(query)
print(f"Hybrid retrieval: {len(hybrid_docs)} documents")

πŸ”„ RAG Implementation Patterns ​

🎯 Basic RAG Chain ​

python
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

def format_docs(docs):
    """Format retrieved documents for context"""
    return "\n\n".join([f"Source {i+1}:\n{doc.page_content}" for i, doc in enumerate(docs)])

# Basic RAG chain
rag_chain = (
    {"context": basic_retriever | format_docs, "question": RunnablePassthrough()}
    | ChatPromptTemplate.from_template("""
    Answer the question based on the following context:

    Context:
    {context}

    Question: {question}

    Instructions:
    - Use only information from the provided context
    - If the answer isn't in the context, say "I don't have enough information"
    - Cite specific sources when possible
    - Be concise but comprehensive

    Answer:""")
    | ChatOpenAI()
    | StrOutputParser()
)

# Test basic RAG
rag_answer = rag_chain.invoke("What are the best practices for Python coding?")
print(f"RAG Answer: {rag_answer}")

# RAG with source attribution
def format_docs_with_sources(docs):
    """Format docs with source tracking"""
    formatted_docs = []
    sources = []
    
    for i, doc in enumerate(docs):
        source_id = f"[{i+1}]"
        sources.append({
            "id": source_id,
            "metadata": doc.metadata,
            "content_preview": doc.page_content[:100] + "..."
        })
        
        formatted_docs.append(f"{source_id} {doc.page_content}")
    
    return {
        "context": "\n\n".join(formatted_docs),
        "sources": sources
    }

# Enhanced RAG with sources
enhanced_rag_chain = (
    {"docs": basic_retriever}
    | RunnablePassthrough.assign(
        context_data=lambda x: format_docs_with_sources(x["docs"])
    )
    | RunnablePassthrough.assign(
        response=ChatPromptTemplate.from_template("""
        Based on the following context, answer the question. Include source references.

        Context:
        {context_data[context]}

        Question: {question}

        Answer with source citations:""")
        | ChatOpenAI()
        | StrOutputParser()
    )
)

enhanced_result = enhanced_rag_chain.invoke({
    "question": "How do I handle exceptions in Python?"
})

print(f"Enhanced answer: {enhanced_result['response']}")
print(f"Sources used: {enhanced_result['context_data']['sources']}")

πŸ” Advanced RAG Patterns ​

python
# Multi-step RAG with query analysis
def create_multi_step_rag():
    """RAG with query analysis and refinement"""
    
    # Step 1: Analyze and expand query
    query_analyzer = (
        ChatPromptTemplate.from_template("""
        Analyze this question and provide:
        1. Key concepts to search for
        2. Alternative phrasings
        3. Related topics to consider

        Question: {question}

        Analysis:""")
        | ChatOpenAI()
        | StrOutputParser()
    )
    
    # Step 2: Enhanced retrieval
    def enhanced_retrieval(inputs):
        original_query = inputs["question"]
        analysis = inputs["analysis"]
        
        # Get docs for original query
        original_docs = basic_retriever.get_relevant_documents(original_query)
        
        # Extract key concepts for additional searches
        # In practice, you'd parse the analysis more carefully
        additional_docs = []
        if "concepts:" in analysis.lower():
            # Simplified concept extraction
            concepts = ["Python", "programming", "best practices"]  # Would parse from analysis
            for concept in concepts:
                concept_docs = basic_retriever.get_relevant_documents(concept)
                additional_docs.extend(concept_docs[:2])
        
        # Combine and deduplicate
        all_docs = original_docs + additional_docs
        unique_docs = {doc.page_content: doc for doc in all_docs}
        return list(unique_docs.values())[:8]
    
    # Step 3: Answer generation
    answer_generator = (
        ChatPromptTemplate.from_template("""
        Question Analysis: {analysis}
        
        Based on the retrieved context, provide a comprehensive answer:

        Context:
        {context}

        Original Question: {question}

        Comprehensive Answer:""")
        | ChatOpenAI()
        | StrOutputParser()
    )
    
    # Combine steps
    multi_step_chain = (
        RunnablePassthrough.assign(analysis=query_analyzer)
        | RunnablePassthrough.assign(
            retrieved_docs=enhanced_retrieval
        )
        | RunnablePassthrough.assign(
            context=lambda x: format_docs(x["retrieved_docs"])
        )
        | answer_generator
    )
    
    return multi_step_chain

multi_step_rag = create_multi_step_rag()
complex_answer = multi_step_rag.invoke({
    "question": "What are the most important Python concepts for beginners?"
})

print(f"Multi-step RAG answer: {complex_answer}")

# Conversational RAG with memory
from langchain.memory import ConversationBufferMemory

class ConversationalRAG:
    """RAG system with conversation memory"""
    
    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
    
    def ask(self, question: str) -> str:
        """Ask question with conversation context"""
        
        # Get conversation history
        history = self.memory.chat_memory.messages
        
        # Enhance query with context
        if history:
            context_enhanced_query = f"""
            Previous conversation context: {self._format_history(history)}
            
            Current question: {question}
            
            Enhanced search query considering context:"""
            
            enhanced_query = self.llm.invoke(context_enhanced_query).content
        else:
            enhanced_query = question
        
        # Retrieve relevant documents
        docs = self.retriever.get_relevant_documents(enhanced_query)
        context = format_docs(docs)
        
        # Generate answer with conversation awareness
        prompt = f"""
        Conversation History:
        {self._format_history(history)}
        
        Retrieved Context:
        {context}
        
        Current Question: {question}
        
        Answer considering both conversation context and retrieved information:"""
        
        answer = self.llm.invoke(prompt).content
        
        # Save to memory
        self.memory.chat_memory.add_user_message(question)
        self.memory.chat_memory.add_ai_message(answer)
        
        return answer
    
    def _format_history(self, messages):
        """Format conversation history"""
        formatted = []
        for msg in messages[-6:]:  # Last 3 exchanges
            role = "Human" if msg.type == "human" else "Assistant"
            formatted.append(f"{role}: {msg.content}")
        return "\n".join(formatted)

conversational_rag = ConversationalRAG(basic_retriever, ChatOpenAI())

# Test conversational flow
response1 = conversational_rag.ask("What is Python?")
print(f"Response 1: {response1}")

response2 = conversational_rag.ask("What are its main advantages?")
print(f"Response 2: {response2}")

response3 = conversational_rag.ask("How do I get started learning it?")
print(f"Response 3: {response3}")

🎯 Retrieval Optimization Strategies ​

⚑ Performance Optimization ​

python
import time
from typing import List, Dict, Any

class OptimizedRetriever:
    """High-performance retriever with caching and batch processing"""
    
    def __init__(self, vectorstore, cache_size=100):
        self.vectorstore = vectorstore
        self.cache = {}
        self.cache_size = cache_size
        self.stats = {"hits": 0, "misses": 0, "avg_time": 0}
    
    def retrieve_with_cache(self, query: str, k: int = 5) -> List[Any]:
        """Retrieve with caching for performance"""
        cache_key = f"{query}_{k}"
        
        # Check cache
        if cache_key in self.cache:
            self.stats["hits"] += 1
            return self.cache[cache_key]
        
        # Cache miss - perform retrieval
        start_time = time.time()
        docs = self.vectorstore.similarity_search(query, k=k)
        retrieval_time = time.time() - start_time
        
        # Update stats
        self.stats["misses"] += 1
        total_requests = self.stats["hits"] + self.stats["misses"]
        self.stats["avg_time"] = (
            (self.stats["avg_time"] * (total_requests - 1) + retrieval_time) 
            / total_requests
        )
        
        # Cache result
        if len(self.cache) >= self.cache_size:
            # Remove oldest entry
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
        
        self.cache[cache_key] = docs
        return docs
    
    def batch_retrieve(self, queries: List[str], k: int = 5) -> Dict[str, List[Any]]:
        """Batch retrieval for efficiency"""
        results = {}
        
        # Separate cached and uncached queries
        cached_queries = []
        uncached_queries = []
        
        for query in queries:
            cache_key = f"{query}_{k}"
            if cache_key in self.cache:
                results[query] = self.cache[cache_key]
                cached_queries.append(query)
            else:
                uncached_queries.append(query)
        
        # Process uncached queries in batch
        if uncached_queries:
            for query in uncached_queries:
                docs = self.vectorstore.similarity_search(query, k=k)
                results[query] = docs
                
                # Cache result
                cache_key = f"{query}_{k}"
                self.cache[cache_key] = docs
        
        return results
    
    def get_stats(self) -> Dict[str, Any]:
        """Get performance statistics"""
        total_requests = self.stats["hits"] + self.stats["misses"]
        hit_rate = self.stats["hits"] / total_requests if total_requests > 0 else 0
        
        return {
            "cache_hit_rate": hit_rate,
            "total_requests": total_requests,
            "average_retrieval_time": self.stats["avg_time"],
            "cache_size": len(self.cache)
        }

# Test optimized retriever
optimized_retriever = OptimizedRetriever(vectorstore)

# Single queries
docs1 = optimized_retriever.retrieve_with_cache("Python basics")
docs2 = optimized_retriever.retrieve_with_cache("Python basics")  # Should hit cache

# Batch queries
batch_queries = [
    "machine learning algorithms",
    "data structures in Python",
    "web development frameworks",
    "Python basics"  # Already cached
]

batch_results = optimized_retriever.batch_retrieve(batch_queries)
print(f"Batch results: {len(batch_results)} queries processed")

# Performance stats
stats = optimized_retriever.get_stats()
print(f"Retriever stats: {stats}")

🎯 Query Enhancement ​

python
class QueryEnhancer:
    """Enhance queries for better retrieval results"""
    
    def __init__(self, llm):
        self.llm = llm
    
    def expand_query(self, query: str) -> List[str]:
        """Generate multiple query variations"""
        expansion_prompt = f"""
        Generate 3-5 alternative phrasings for this search query that would help find relevant information:
        
        Original query: {query}
        
        Alternative queries (one per line):"""
        
        response = self.llm.invoke(expansion_prompt).content
        
        # Parse response into list
        alternatives = [
            line.strip() 
            for line in response.split('\n') 
            if line.strip() and not line.strip().startswith('Alternative')
        ]
        
        return [query] + alternatives[:4]  # Original + up to 4 alternatives
    
    def extract_keywords(self, query: str) -> List[str]:
        """Extract key search terms"""
        keyword_prompt = f"""
        Extract the most important keywords and phrases from this query for document search:
        
        Query: {query}
        
        Keywords (comma-separated):"""
        
        response = self.llm.invoke(keyword_prompt).content
        keywords = [kw.strip() for kw in response.split(',')]
        
        return keywords
    
    def contextualize_query(self, query: str, conversation_history: List[str]) -> str:
        """Add conversation context to query"""
        if not conversation_history:
            return query
        
        context_prompt = f"""
        Given this conversation history, rewrite the current query to be more specific and contextual:
        
        Conversation history:
        {chr(10).join(conversation_history[-4:])}  # Last 4 exchanges
        
        Current query: {query}
        
        Contextualized query:"""
        
        response = self.llm.invoke(context_prompt).content
        return response.strip()

# Enhanced retrieval pipeline
class EnhancedRetrievalPipeline:
    """Complete retrieval pipeline with query enhancement"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.query_enhancer = QueryEnhancer(llm)
        self.optimized_retriever = OptimizedRetriever(vectorstore)
    
    def retrieve(self, query: str, conversation_history: List[str] = None, k: int = 5) -> Dict[str, Any]:
        """Enhanced retrieval with multiple strategies"""
        
        # Step 1: Contextualize query if history available
        if conversation_history:
            contextualized_query = self.query_enhancer.contextualize_query(
                query, conversation_history
            )
        else:
            contextualized_query = query
        
        # Step 2: Expand query
        query_variations = self.query_enhancer.expand_query(contextualized_query)
        
        # Step 3: Retrieve for each variation
        all_docs = []
        for variation in query_variations:
            docs = self.optimized_retriever.retrieve_with_cache(variation, k=3)
            all_docs.extend(docs)
        
        # Step 4: Deduplicate and rank
        unique_docs = self._deduplicate_and_rank(all_docs, query, k)
        
        # Step 5: Extract keywords for additional context
        keywords = self.query_enhancer.extract_keywords(query)
        
        return {
            "documents": unique_docs,
            "original_query": query,
            "contextualized_query": contextualized_query,
            "query_variations": query_variations,
            "keywords": keywords,
            "total_retrieved": len(all_docs),
            "final_count": len(unique_docs)
        }
    
    def _deduplicate_and_rank(self, docs: List[Any], original_query: str, k: int) -> List[Any]:
        """Remove duplicates and rank by relevance"""
        
        # Simple deduplication by content similarity
        seen_content = set()
        unique_docs = []
        
        for doc in docs:
            content_hash = hash(doc.page_content[:200])  # First 200 chars
            if content_hash not in seen_content:
                seen_content.add(content_hash)
                unique_docs.append(doc)
        
        # In practice, you might use a reranking model here
        # For now, just return top k
        return unique_docs[:k]

# Test enhanced pipeline
enhanced_pipeline = EnhancedRetrievalPipeline(vectorstore, ChatOpenAI())

# Test with conversation context
conversation_history = [
    "Human: What is Python?",
    "Assistant: Python is a programming language...",
    "Human: What about its libraries?"
]

enhanced_result = enhanced_pipeline.retrieve(
    "How do I use them?",
    conversation_history=conversation_history
)

print(f"Enhanced retrieval result: {enhanced_result}")
print(f"Found {enhanced_result['final_count']} unique documents")
print(f"Query variations: {enhanced_result['query_variations']}")

πŸ”„ Document Management and Updates ​

πŸ“ Dynamic Document Updates ​

python
class DynamicKnowledgeBase:
    """Knowledge base with dynamic updates and versioning"""
    
    def __init__(self, vectorstore, embeddings):
        self.vectorstore = vectorstore
        self.embeddings = embeddings
        self.document_registry = {}  # Track document metadata
        self.version_counter = 0
    
    def add_documents(self, documents: List[Document], source: str = "unknown") -> List[str]:
        """Add new documents to the knowledge base"""
        
        # Add metadata
        for doc in documents:
            doc.metadata.update({
                "source": source,
                "added_timestamp": time.time(),
                "version": self.version_counter
            })
        
        # Add to vector store
        doc_ids = self.vectorstore.add_documents(documents)
        
        # Update registry
        for doc_id, doc in zip(doc_ids, documents):
            self.document_registry[doc_id] = {
                "content_preview": doc.page_content[:100],
                "metadata": doc.metadata,
                "status": "active"
            }
        
        self.version_counter += 1
        return doc_ids
    
    def update_document(self, doc_id: str, new_content: str, metadata_updates: Dict = None):
        """Update existing document"""
        
        if doc_id not in self.document_registry:
            raise ValueError(f"Document {doc_id} not found")
        
        # Mark old version as inactive
        self.document_registry[doc_id]["status"] = "updated"
        
        # Create new document
        old_metadata = self.document_registry[doc_id]["metadata"].copy()
        if metadata_updates:
            old_metadata.update(metadata_updates)
        
        old_metadata.update({
            "updated_timestamp": time.time(),
            "version": self.version_counter,
            "replaces": doc_id
        })
        
        new_doc = Document(page_content=new_content, metadata=old_metadata)
        new_doc_ids = self.add_documents([new_doc], source="update")
        
        return new_doc_ids[0]
    
    def delete_document(self, doc_id: str):
        """Soft delete document"""
        if doc_id in self.document_registry:
            self.document_registry[doc_id]["status"] = "deleted"
            # Note: Actual removal from vector store depends on implementation
    
    def search_with_filters(self, query: str, filters: Dict = None, k: int = 5) -> List[Document]:
        """Search with metadata filters"""
        
        # Basic search
        docs = self.vectorstore.similarity_search(query, k=k*2)  # Get more to filter
        
        # Apply filters
        if filters:
            filtered_docs = []
            for doc in docs:
                match = True
                for key, value in filters.items():
                    if key not in doc.metadata or doc.metadata[key] != value:
                        match = False
                        break
                
                if match:
                    filtered_docs.append(doc)
            
            docs = filtered_docs[:k]
        
        # Filter out inactive documents
        active_docs = []
        for doc in docs:
            doc_id = self._get_doc_id(doc)  # Implementation-specific
            if doc_id and self.document_registry.get(doc_id, {}).get("status") == "active":
                active_docs.append(doc)
        
        return active_docs[:k]
    
    def _get_doc_id(self, doc: Document) -> str:
        """Get document ID - implementation specific"""
        # This would depend on your vector store implementation
        return doc.metadata.get("doc_id", "")
    
    def get_knowledge_base_stats(self) -> Dict[str, Any]:
        """Get knowledge base statistics"""
        total_docs = len(self.document_registry)
        active_docs = sum(1 for doc in self.document_registry.values() if doc["status"] == "active")
        
        return {
            "total_documents": total_docs,
            "active_documents": active_docs,
            "current_version": self.version_counter,
            "sources": list(set(doc["metadata"]["source"] for doc in self.document_registry.values()))
        }

# Example usage
dynamic_kb = DynamicKnowledgeBase(vectorstore, embeddings)

# Add new documents
new_docs = [
    Document(page_content="Python 3.12 introduces new features...", metadata={"topic": "python"}),
    Document(page_content="Machine learning best practices include...", metadata={"topic": "ml"})
]

doc_ids = dynamic_kb.add_documents(new_docs, source="manual_entry")
print(f"Added documents: {doc_ids}")

# Search with filters
filtered_results = dynamic_kb.search_with_filters(
    "Python features",
    filters={"topic": "python"},
    k=3
)

print(f"Filtered search: {len(filtered_results)} results")

# Get stats
stats = dynamic_kb.get_knowledge_base_stats()
print(f"Knowledge base stats: {stats}")

πŸ”— Integration Patterns ​

πŸ”„ Complete RAG System ​

python
class ProductionRAGSystem:
    """Production-ready RAG system with monitoring and error handling"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.setup_components()
        self.setup_monitoring()
    
    def setup_components(self):
        """Initialize all RAG components"""
        
        # Document processing
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.config.get("chunk_size", 1000),
            chunk_overlap=self.config.get("chunk_overlap", 200)
        )
        
        # Embeddings and vector store
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = Chroma(
            persist_directory=self.config.get("vector_db_path", "./chroma_db"),
            embedding_function=self.embeddings
        )
        
        # Enhanced retrieval pipeline
        self.retrieval_pipeline = EnhancedRetrievalPipeline(
            self.vectorstore, 
            ChatOpenAI()
        )
        
        # Response generation
        self.response_chain = self._create_response_chain()
    
    def _create_response_chain(self):
        """Create the response generation chain"""
        return (
            ChatPromptTemplate.from_template("""
            You are a helpful AI assistant. Answer the question based on the provided context.
            
            Context:
            {context}
            
            Question: {question}
            
            Instructions:
            - Provide accurate, helpful answers based on the context
            - If information is not in the context, say so clearly
            - Include relevant source references
            - Be concise but comprehensive
            
            Answer:""")
            | ChatOpenAI(temperature=self.config.get("temperature", 0.1))
            | StrOutputParser()
        )
    
    def setup_monitoring(self):
        """Setup monitoring and logging"""
        self.query_log = []
        self.performance_metrics = {
            "total_queries": 0,
            "average_response_time": 0,
            "retrieval_success_rate": 0
        }
    
    def add_documents(self, file_paths: List[str]) -> Dict[str, Any]:
        """Add documents from files"""
        results = {"success": [], "failed": []}
        
        for file_path in file_paths:
            try:
                # Load document
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                elif file_path.endswith('.txt'):
                    loader = TextLoader(file_path)
                else:
                    results["failed"].append(f"{file_path}: Unsupported format")
                    continue
                
                docs = loader.load()
                
                # Split documents
                split_docs = self.text_splitter.split_documents(docs)
                
                # Add to vector store
                self.vectorstore.add_documents(split_docs)
                
                results["success"].append(f"{file_path}: {len(split_docs)} chunks added")
                
            except Exception as e:
                results["failed"].append(f"{file_path}: {str(e)}")
        
        return results
    
    def query(self, question: str, conversation_history: List[str] = None) -> Dict[str, Any]:
        """Process a query through the RAG system"""
        start_time = time.time()
        
        try:
            # Enhanced retrieval
            retrieval_result = self.retrieval_pipeline.retrieve(
                question, 
                conversation_history or []
            )
            
            # Format context
            context = format_docs(retrieval_result["documents"])
            
            # Generate response
            response = self.response_chain.invoke({
                "context": context,
                "question": question
            })
            
            # Calculate metrics
            response_time = time.time() - start_time
            
            # Log query
            query_log_entry = {
                "question": question,
                "response_time": response_time,
                "num_docs_retrieved": len(retrieval_result["documents"]),
                "success": True,
                "timestamp": time.time()
            }
            self.query_log.append(query_log_entry)
            
            # Update metrics
            self._update_metrics(query_log_entry)
            
            return {
                "answer": response,
                "sources": retrieval_result["documents"],
                "query_variations": retrieval_result["query_variations"],
                "response_time": response_time,
                "success": True
            }
            
        except Exception as e:
            error_time = time.time() - start_time
            
            # Log error
            error_log_entry = {
                "question": question,
                "response_time": error_time,
                "success": False,
                "error": str(e),
                "timestamp": time.time()
            }
            self.query_log.append(error_log_entry)
            
            return {
                "answer": "I apologize, but I encountered an error processing your question.",
                "error": str(e),
                "response_time": error_time,
                "success": False
            }
    
    def _update_metrics(self, query_log_entry: Dict[str, Any]):
        """Update performance metrics"""
        self.performance_metrics["total_queries"] += 1
        
        # Update average response time
        total_queries = self.performance_metrics["total_queries"]
        current_avg = self.performance_metrics["average_response_time"]
        new_time = query_log_entry["response_time"]
        
        self.performance_metrics["average_response_time"] = (
            (current_avg * (total_queries - 1) + new_time) / total_queries
        )
        
        # Update success rate
        successful_queries = sum(1 for log in self.query_log if log["success"])
        self.performance_metrics["retrieval_success_rate"] = (
            successful_queries / total_queries
        )
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get system performance metrics"""
        return {
            "performance": self.performance_metrics,
            "recent_queries": len(self.query_log),
            "vector_store_stats": self._get_vector_store_stats()
        }
    
    def _get_vector_store_stats(self) -> Dict[str, Any]:
        """Get vector store statistics"""
        # This would depend on your vector store implementation
        return {"status": "active"}

# Example configuration and usage
rag_config = {
    "chunk_size": 1000,
    "chunk_overlap": 200,
    "vector_db_path": "./production_chroma_db",
    "temperature": 0.1
}

# Initialize production RAG system
rag_system = ProductionRAGSystem(rag_config)

# Add documents
file_results = rag_system.add_documents([
    "data/python_guide.pdf",
    "data/best_practices.txt"
])
print(f"Document loading results: {file_results}")

# Query the system
result = rag_system.query(
    "What are the best practices for error handling in Python?",
    conversation_history=["Previous question about Python basics"]
)

print(f"RAG Response: {result['answer']}")
print(f"Response time: {result['response_time']:.2f}s")

# Get system metrics
metrics = rag_system.get_metrics()
print(f"System metrics: {metrics}")

πŸ”— Next Steps ​

Ready to build more sophisticated AI systems? Continue with:


Key Retrieval Takeaways:

  • RAG is essential for accurate, current, and verifiable AI responses
  • Document processing quality directly impacts retrieval effectiveness
  • Vector search excels at semantic similarity, keywords for exact matches
  • Query enhancement significantly improves retrieval quality
  • Hybrid approaches combining multiple strategies work best
  • Monitoring and optimization crucial for production systems
  • Dynamic updates enable evolving knowledge bases

Released under the MIT License.