Skip to content

Retrieval Augmented Generation (RAG) - Knowledge-Enhanced AI ​

Build AI systems that combine the power of language models with external knowledge sources

πŸ” What is Retrieval Augmented Generation (RAG)? ​

RAG (Retrieval Augmented Generation) is a powerful technique that enhances language models by providing them with relevant external information during the generation process. Instead of relying solely on training data, RAG systems can access and utilize up-to-date, domain-specific knowledge from external sources.

Simple Analogy: Think of RAG as giving an AI assistant access to a vast library - instead of only answering from memory, it can quickly look up relevant books, articles, and documents to provide more accurate, current, and detailed responses.

🎯 RAG Architecture Overview ​

text
                    πŸ” RAG SYSTEM ARCHITECTURE πŸ”
                   (Knowledge-Enhanced Generation)

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                     πŸ“Š KNOWLEDGE BASE                          β”‚
    β”‚                   "External Information"                        β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚         🎯 RAG PIPELINE                 β”‚
    β””β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚        β”‚        β”‚        β”‚
       β–Ό        β–Ό        β–Ό        β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚πŸ“₯ LOAD  β”‚βœ‚οΈ SPLIT β”‚πŸ”’ EMBED β”‚πŸ’Ύ STORE β”‚
  β”‚DOCUMENTSβ”‚ TEXT    β”‚ CHUNKS  β”‚VECTORS  β”‚
  β”‚         β”‚         β”‚         β”‚         β”‚
  β”‚PDFs     β”‚Smart    β”‚Semantic β”‚Vector   β”‚
  β”‚Websites β”‚Chunking β”‚Vectors  β”‚Database β”‚
  β”‚APIs     β”‚Context  β”‚Search   β”‚Index    β”‚
  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
       β”‚         β”‚         β”‚         β”‚
       β–Ό         β–Ό         β–Ό         β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚                    πŸ” RETRIEVAL PROCESS                        β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                       β”‚
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚         πŸ’¬ GENERATION PIPELINE          β”‚
  β””β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚        β”‚        β”‚        β”‚
     β–Ό        β–Ό        β–Ό        β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  │❓ QUERY β”‚πŸ” SEARCHβ”‚πŸ“ PROMPTβ”‚πŸ€– ANSWERβ”‚
  β”‚USER     β”‚RELEVANT β”‚ASSEMBLY β”‚WITH     β”‚
  β”‚QUESTION β”‚DOCS     β”‚CONTEXT  β”‚CONTEXT  β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Document Loading & Processing ​

πŸ“₯ Multi-Format Document Loaders ​

Loading documents from various sources and formats

Basic Document Loading ​

python
from langchain.document_loaders import (
    PyPDFLoader, 
    TextLoader, 
    WebBaseLoader,
    CSVLoader,
    JSONLoader
)

# PDF Document Loading
pdf_loader = PyPDFLoader("path/to/document.pdf")
pdf_documents = pdf_loader.load()

# Web Content Loading
web_loader = WebBaseLoader("https://example.com/article")
web_documents = web_loader.load()

# Text File Loading
text_loader = TextLoader("path/to/file.txt")
text_documents = text_loader.load()

# CSV Data Loading
csv_loader = CSVLoader("path/to/data.csv")
csv_documents = csv_loader.load()

print(f"Loaded {len(pdf_documents)} PDF pages")
print(f"First document preview: {pdf_documents[0].page_content[:200]}...")

Advanced Document Loading with Metadata ​

python
from langchain.document_loaders import DirectoryLoader
from langchain.schema import Document
import os
from datetime import datetime

class EnhancedDocumentLoader:
    def __init__(self, base_directory: str):
        self.base_directory = base_directory
        self.supported_extensions = {
            '.pdf': PyPDFLoader,
            '.txt': TextLoader,
            '.csv': CSVLoader,
            '.json': JSONLoader
        }
    
    def load_documents_with_metadata(self) -> List[Document]:
        """Load all documents with enhanced metadata"""
        documents = []
        
        for root, dirs, files in os.walk(self.base_directory):
            for file in files:
                file_path = os.path.join(root, file)
                file_ext = os.path.splitext(file)[1].lower()
                
                if file_ext in self.supported_extensions:
                    try:
                        loader_class = self.supported_extensions[file_ext]
                        loader = loader_class(file_path)
                        file_docs = loader.load()
                        
                        # Add enhanced metadata
                        for doc in file_docs:
                            doc.metadata.update({
                                'file_path': file_path,
                                'file_name': file,
                                'file_type': file_ext,
                                'file_size': os.path.getsize(file_path),
                                'created_date': datetime.fromtimestamp(os.path.getctime(file_path)),
                                'modified_date': datetime.fromtimestamp(os.path.getmtime(file_path)),
                                'directory': os.path.dirname(file_path)
                            })
                        
                        documents.extend(file_docs)
                        
                    except Exception as e:
                        print(f"Error loading {file_path}: {e}")
        
        return documents
    
    def filter_documents_by_criteria(self, documents: List[Document], criteria: dict) -> List[Document]:
        """Filter documents based on metadata criteria"""
        filtered_docs = []
        
        for doc in documents:
            meets_criteria = True
            
            for key, value in criteria.items():
                if key not in doc.metadata:
                    meets_criteria = False
                    break
                
                if isinstance(value, str) and value.lower() not in str(doc.metadata[key]).lower():
                    meets_criteria = False
                    break
                elif isinstance(value, (int, float)) and doc.metadata[key] != value:
                    meets_criteria = False
                    break
            
            if meets_criteria:
                filtered_docs.append(doc)
        
        return filtered_docs

# Usage
enhanced_loader = EnhancedDocumentLoader("./documents")
all_documents = enhanced_loader.load_documents_with_metadata()

# Filter for recent PDF documents
recent_pdfs = enhanced_loader.filter_documents_by_criteria(
    all_documents,
    {'file_type': '.pdf', 'file_size': 1000}  # PDFs larger than 1KB
)

Web Scraping with Custom Processing ​

python
from langchain.document_loaders import AsyncHtmlLoader
from langchain.document_transformers import Html2TextTransformer
import asyncio

class WebContentLoader:
    def __init__(self):
        self.html_transformer = Html2TextTransformer()
    
    async def load_multiple_urls(self, urls: List[str]) -> List[Document]:
        """Load content from multiple URLs asynchronously"""
        loader = AsyncHtmlLoader(urls)
        html_documents = await loader.aload()
        
        # Transform HTML to clean text
        text_documents = self.html_transformer.transform_documents(html_documents)
        
        # Add web-specific metadata
        for i, doc in enumerate(text_documents):
            doc.metadata.update({
                'source_url': urls[i],
                'content_type': 'web',
                'scraped_date': datetime.now(),
                'word_count': len(doc.page_content.split())
            })
        
        return text_documents
    
    def clean_web_content(self, documents: List[Document]) -> List[Document]:
        """Clean and process web content"""
        cleaned_docs = []
        
        for doc in documents:
            # Remove extra whitespace and newlines
            cleaned_content = ' '.join(doc.page_content.split())
            
            # Remove common web artifacts
            artifacts = ['Cookie Policy', 'Privacy Policy', 'Subscribe to Newsletter']
            for artifact in artifacts:
                cleaned_content = cleaned_content.replace(artifact, '')
            
            # Create new document with cleaned content
            cleaned_doc = Document(
                page_content=cleaned_content,
                metadata=doc.metadata
            )
            cleaned_docs.append(cleaned_doc)
        
        return cleaned_docs

# Usage
web_loader = WebContentLoader()
urls = [
    "https://example.com/article1",
    "https://example.com/article2",
    "https://example.com/article3"
]

# Load and clean web content
web_docs = await web_loader.load_multiple_urls(urls)
cleaned_docs = web_loader.clean_web_content(web_docs)

βœ‚οΈ Text Splitting & Chunking ​

🎯 Smart Chunking Strategies ​

Optimizing document chunks for effective retrieval

Basic Text Splitting ​

python
from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter
)

# Recursive Character Splitter (Recommended)
recursive_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
    separators=["\n\n", "\n", " ", ""]
)

# Split documents
chunks = recursive_splitter.split_documents(documents)
print(f"Created {len(chunks)} chunks from {len(documents)} documents")

# Token-based Splitter
token_splitter = TokenTextSplitter(
    chunk_size=512,
    chunk_overlap=50
)

token_chunks = token_splitter.split_documents(documents)

Advanced Semantic Chunking ​

python
from langchain.schema import Document
import re
from typing import List

class SemanticChunker:
    def __init__(self, target_chunk_size: int = 1000, overlap_size: int = 200):
        self.target_chunk_size = target_chunk_size
        self.overlap_size = overlap_size
    
    def chunk_by_sections(self, documents: List[Document]) -> List[Document]:
        """Chunk documents by logical sections (headers, paragraphs)"""
        chunked_docs = []
        
        for doc in documents:
            sections = self._identify_sections(doc.page_content)
            
            for i, section in enumerate(sections):
                if len(section['content']) > self.target_chunk_size:
                    # Further split large sections
                    subsections = self._split_large_section(section['content'])
                    for j, subsection in enumerate(subsections):
                        chunk_doc = Document(
                            page_content=subsection,
                            metadata={
                                **doc.metadata,
                                'chunk_id': f"{doc.metadata.get('file_name', 'doc')}_{i}_{j}",
                                'section_title': section['title'],
                                'chunk_type': 'subsection'
                            }
                        )
                        chunked_docs.append(chunk_doc)
                else:
                    chunk_doc = Document(
                        page_content=section['content'],
                        metadata={
                            **doc.metadata,
                            'chunk_id': f"{doc.metadata.get('file_name', 'doc')}_{i}",
                            'section_title': section['title'],
                            'chunk_type': 'section'
                        }
                    )
                    chunked_docs.append(chunk_doc)
        
        return chunked_docs
    
    def _identify_sections(self, text: str) -> List[dict]:
        """Identify logical sections in text"""
        sections = []
        
        # Split by headers (markdown-style)
        header_pattern = r'^#{1,6}\s+(.+)$'
        lines = text.split('\n')
        
        current_section = {'title': 'Introduction', 'content': ''}
        
        for line in lines:
            header_match = re.match(header_pattern, line)
            if header_match:
                # Save previous section
                if current_section['content'].strip():
                    sections.append(current_section)
                
                # Start new section
                current_section = {
                    'title': header_match.group(1),
                    'content': ''
                }
            else:
                current_section['content'] += line + '\n'
        
        # Add final section
        if current_section['content'].strip():
            sections.append(current_section)
        
        return sections
    
    def _split_large_section(self, content: str) -> List[str]:
        """Split large sections into smaller chunks"""
        # Split by paragraphs first
        paragraphs = content.split('\n\n')
        
        chunks = []
        current_chunk = ""
        
        for paragraph in paragraphs:
            if len(current_chunk + paragraph) <= self.target_chunk_size:
                current_chunk += paragraph + '\n\n'
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = paragraph + '\n\n'
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks
    
    def add_contextual_overlap(self, chunks: List[Document]) -> List[Document]:
        """Add contextual overlap between chunks"""
        enhanced_chunks = []
        
        for i, chunk in enumerate(chunks):
            content = chunk.page_content
            
            # Add context from previous chunk
            if i > 0 and chunks[i-1].metadata.get('file_name') == chunk.metadata.get('file_name'):
                prev_content = chunks[i-1].page_content
                overlap_text = prev_content[-self.overlap_size:] if len(prev_content) > self.overlap_size else prev_content
                content = f"[Previous context: ...{overlap_text}]\n\n{content}"
            
            # Add context from next chunk
            if i < len(chunks) - 1 and chunks[i+1].metadata.get('file_name') == chunk.metadata.get('file_name'):
                next_content = chunks[i+1].page_content
                overlap_text = next_content[:self.overlap_size] if len(next_content) > self.overlap_size else next_content
                content = f"{content}\n\n[Next context: {overlap_text}...]"
            
            enhanced_chunk = Document(
                page_content=content,
                metadata={
                    **chunk.metadata,
                    'has_context_overlap': True,
                    'chunk_position': i,
                    'total_chunks': len(chunks)
                }
            )
            enhanced_chunks.append(enhanced_chunk)
        
        return enhanced_chunks

# Usage
semantic_chunker = SemanticChunker(target_chunk_size=800, overlap_size=150)
semantic_chunks = semantic_chunker.chunk_by_sections(documents)
enhanced_chunks = semantic_chunker.add_contextual_overlap(semantic_chunks)

Domain-Specific Chunking ​

python
class CodeDocumentChunker:
    """Specialized chunker for code documentation"""
    
    def __init__(self):
        self.code_patterns = {
            'function': r'def\s+\w+\([^)]*\):',
            'class': r'class\s+\w+.*:',
            'import': r'(?:import|from)\s+\w+',
            'comment': r'#.*$|""".*?"""',
        }
    
    def chunk_code_documentation(self, documents: List[Document]) -> List[Document]:
        """Chunk code documentation preserving logical structure"""
        chunked_docs = []
        
        for doc in documents:
            if self._is_code_document(doc):
                chunks = self._chunk_by_code_structure(doc)
            else:
                chunks = self._chunk_by_markdown_structure(doc)
            
            chunked_docs.extend(chunks)
        
        return chunked_docs
    
    def _is_code_document(self, doc: Document) -> bool:
        """Determine if document contains code"""
        content = doc.page_content
        code_indicators = ['def ', 'class ', 'import ', 'function(', '```python', '```javascript']
        return any(indicator in content for indicator in code_indicators)
    
    def _chunk_by_code_structure(self, doc: Document) -> List[Document]:
        """Chunk by code structure (functions, classes, etc.)"""
        chunks = []
        lines = doc.page_content.split('\n')
        
        current_chunk_lines = []
        current_function_or_class = None
        
        for line in lines:
            # Check for function or class definition
            if re.match(r'(def|class)\s+\w+', line.strip()):
                # Save previous chunk
                if current_chunk_lines:
                    chunk_content = '\n'.join(current_chunk_lines)
                    chunks.append(Document(
                        page_content=chunk_content,
                        metadata={
                            **doc.metadata,
                            'chunk_type': 'code_block',
                            'function_or_class': current_function_or_class
                        }
                    ))
                
                # Start new chunk
                current_chunk_lines = [line]
                current_function_or_class = line.strip()
            else:
                current_chunk_lines.append(line)
        
        # Add final chunk
        if current_chunk_lines:
            chunk_content = '\n'.join(current_chunk_lines)
            chunks.append(Document(
                page_content=chunk_content,
                metadata={
                    **doc.metadata,
                    'chunk_type': 'code_block',
                    'function_or_class': current_function_or_class
                }
            ))
        
        return chunks

# Usage
code_chunker = CodeDocumentChunker()
code_chunks = code_chunker.chunk_code_documentation(documents)

πŸ”’ Vector Embeddings & Storage ​

⚑ Embedding Generation ​

Converting text chunks into semantic vector representations

Basic Embedding Setup ​

python
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma, Pinecone
import numpy as np

# OpenAI Embeddings (Recommended for quality)
openai_embeddings = OpenAIEmbeddings(
    model="text-embedding-ada-002",
    openai_api_key="your-api-key"
)

# Open Source Alternative
huggingface_embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2"
)

# Test embedding generation
sample_text = "LangChain is a framework for building applications with LLMs"
embedding_vector = openai_embeddings.embed_query(sample_text)
print(f"Embedding dimension: {len(embedding_vector)}")
print(f"First 5 values: {embedding_vector[:5]}")

Custom Embedding Pipeline ​

python
from sentence_transformers import SentenceTransformer
import torch

class CustomEmbeddingGenerator:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = None):
        self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = SentenceTransformer(model_name)
        self.model.to(self.device)
        
    def embed_documents(self, texts: List[str], batch_size: int = 32) -> List[List[float]]:
        """Generate embeddings for multiple documents"""
        embeddings = []
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            batch_embeddings = self.model.encode(
                batch,
                convert_to_tensor=True,
                device=self.device,
                show_progress_bar=True
            )
            embeddings.extend(batch_embeddings.cpu().numpy())
        
        return embeddings
    
    def embed_query(self, text: str) -> List[float]:
        """Generate embedding for a single query"""
        embedding = self.model.encode(
            [text],
            convert_to_tensor=True,
            device=self.device
        )
        return embedding.cpu().numpy()[0]
    
    def calculate_similarity(self, embedding1: List[float], embedding2: List[float]) -> float:
        """Calculate cosine similarity between two embeddings"""
        vec1 = np.array(embedding1)
        vec2 = np.array(embedding2)
        
        dot_product = np.dot(vec1, vec2)
        norm_product = np.linalg.norm(vec1) * np.linalg.norm(vec2)
        
        return dot_product / norm_product if norm_product != 0 else 0.0
    
    def get_model_info(self) -> dict:
        """Get information about the embedding model"""
        return {
            'model_name': self.model._modules['0'].auto_model.name_or_path,
            'embedding_dimension': self.model.get_sentence_embedding_dimension(),
            'max_sequence_length': self.model.max_seq_length,
            'device': str(self.device)
        }

# Usage
custom_embedder = CustomEmbeddingGenerator()
model_info = custom_embedder.get_model_info()
print(f"Model info: {model_info}")

# Generate embeddings for document chunks
chunk_texts = [chunk.page_content for chunk in chunks[:10]]  # First 10 chunks
chunk_embeddings = custom_embedder.embed_documents(chunk_texts)

πŸ’Ύ Vector Database Integration ​

FAISS Vector Store ​

python
from langchain.vectorstores import FAISS
import pickle

class FAISSVectorStore:
    def __init__(self, embeddings_model):
        self.embeddings = embeddings_model
        self.vectorstore = None
    
    def create_vectorstore(self, documents: List[Document]) -> None:
        """Create FAISS vector store from documents"""
        print(f"Creating vector store from {len(documents)} documents...")
        
        self.vectorstore = FAISS.from_documents(
            documents=documents,
            embedding=self.embeddings
        )
        
        print("Vector store created successfully!")
    
    def add_documents(self, new_documents: List[Document]) -> None:
        """Add new documents to existing vector store"""
        if self.vectorstore is None:
            self.create_vectorstore(new_documents)
        else:
            self.vectorstore.add_documents(new_documents)
    
    def save_vectorstore(self, path: str) -> None:
        """Save vector store to disk"""
        if self.vectorstore:
            self.vectorstore.save_local(path)
            print(f"Vector store saved to {path}")
    
    def load_vectorstore(self, path: str) -> None:
        """Load vector store from disk"""
        self.vectorstore = FAISS.load_local(path, self.embeddings)
        print(f"Vector store loaded from {path}")
    
    def similarity_search(self, query: str, k: int = 5, score_threshold: float = 0.0) -> List[Document]:
        """Search for similar documents"""
        if not self.vectorstore:
            raise ValueError("Vector store not initialized")
        
        # Search with score threshold
        docs_with_scores = self.vectorstore.similarity_search_with_score(query, k=k)
        
        # Filter by score threshold
        filtered_docs = [
            doc for doc, score in docs_with_scores 
            if score >= score_threshold
        ]
        
        return filtered_docs
    
    def get_relevant_documents_with_metadata(self, query: str, k: int = 5) -> List[dict]:
        """Get relevant documents with similarity scores and metadata"""
        if not self.vectorstore:
            raise ValueError("Vector store not initialized")
        
        docs_with_scores = self.vectorstore.similarity_search_with_score(query, k=k)
        
        results = []
        for doc, score in docs_with_scores:
            results.append({
                'content': doc.page_content,
                'metadata': doc.metadata,
                'similarity_score': score,
                'chunk_id': doc.metadata.get('chunk_id', 'unknown')
            })
        
        return results

# Usage
faiss_store = FAISSVectorStore(openai_embeddings)
faiss_store.create_vectorstore(chunks)

# Search for relevant documents
query = "How to implement machine learning models?"
relevant_docs = faiss_store.similarity_search(query, k=3)

for i, doc in enumerate(relevant_docs):
    print(f"Document {i+1}:")
    print(f"Content: {doc.page_content[:200]}...")
    print(f"Source: {doc.metadata.get('source', 'Unknown')}")
    print("---")

Chroma Vector Database ​

python
from langchain.vectorstores import Chroma
import chromadb

class ChromaVectorStore:
    def __init__(self, embeddings_model, collection_name: str = "langchain_collection"):
        self.embeddings = embeddings_model
        self.collection_name = collection_name
        self.vectorstore = None
    
    def setup_chroma(self, persist_directory: str = "./chroma_db"):
        """Setup Chroma with persistence"""
        self.vectorstore = Chroma(
            collection_name=self.collection_name,
            embedding_function=self.embeddings,
            persist_directory=persist_directory
        )
    
    def add_documents_with_metadata_filtering(self, documents: List[Document]):
        """Add documents with enhanced metadata for filtering"""
        enhanced_docs = []
        
        for doc in documents:
            # Add searchable metadata
            enhanced_metadata = {
                **doc.metadata,
                'content_length': len(doc.page_content),
                'word_count': len(doc.page_content.split()),
                'has_code': 'def ' in doc.page_content or 'class ' in doc.page_content,
                'language': self._detect_language(doc.page_content)
            }
            
            enhanced_doc = Document(
                page_content=doc.page_content,
                metadata=enhanced_metadata
            )
            enhanced_docs.append(enhanced_doc)
        
        self.vectorstore.add_documents(enhanced_docs)
    
    def _detect_language(self, text: str) -> str:
        """Simple language detection"""
        code_keywords = ['def ', 'class ', 'import ', 'function', 'var ', 'const ']
        if any(keyword in text for keyword in code_keywords):
            return 'code'
        elif len(text.split()) > 10:
            return 'natural_language'
        else:
            return 'mixed'
    
    def search_with_filters(self, query: str, filter_criteria: dict = None, k: int = 5) -> List[Document]:
        """Search with metadata filters"""
        search_kwargs = {'k': k}
        
        if filter_criteria:
            search_kwargs['filter'] = filter_criteria
        
        return self.vectorstore.search(query, search_type="similarity", **search_kwargs)
    
    def get_collection_stats(self) -> dict:
        """Get statistics about the collection"""
        if not self.vectorstore:
            return {"error": "Vector store not initialized"}
        
        # Get collection info
        collection = self.vectorstore._collection
        return {
            'total_documents': collection.count(),
            'collection_name': self.collection_name
        }

# Usage
chroma_store = ChromaVectorStore(openai_embeddings)
chroma_store.setup_chroma("./my_chroma_db")
chroma_store.add_documents_with_metadata_filtering(chunks)

# Search with filters
filtered_results = chroma_store.search_with_filters(
    query="machine learning implementation",
    filter_criteria={"language": "natural_language"},
    k=3
)

πŸ” Retrieval Strategies ​

🎯 Advanced Retrieval Methods ​

Multi-Query Retrieval ​

python
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.llms import OpenAI

class AdvancedRetriever:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
    
    def setup_multi_query_retriever(self) -> MultiQueryRetriever:
        """Setup retriever that generates multiple query variations"""
        retriever = MultiQueryRetriever.from_llm(
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}),
            llm=self.llm
        )
        return retriever
    
    def contextual_compression_retrieval(self, query: str) -> List[Document]:
        """Retrieve and compress results for relevance"""
        from langchain.retrievers import ContextualCompressionRetriever
        from langchain.retrievers.document_compressors import LLMChainExtractor
        
        # Setup compression
        compressor = LLMChainExtractor.from_llm(self.llm)
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=self.vectorstore.as_retriever()
        )
        
        return compression_retriever.get_relevant_documents(query)
    
    def ensemble_retrieval(self, query: str, k: int = 5) -> List[Document]:
        """Combine multiple retrieval strategies"""
        # Strategy 1: Similarity search
        similarity_docs = self.vectorstore.similarity_search(query, k=k)
        
        # Strategy 2: MMR (Maximum Marginal Relevance)
        mmr_docs = self.vectorstore.max_marginal_relevance_search(query, k=k)
        
        # Strategy 3: Similarity search with score threshold
        threshold_docs = self.vectorstore.similarity_search_with_score(query, k=k)
        threshold_docs = [doc for doc, score in threshold_docs if score > 0.7]
        
        # Combine and deduplicate results
        all_docs = similarity_docs + mmr_docs + threshold_docs
        unique_docs = self._deduplicate_documents(all_docs)
        
        # Rank by relevance
        ranked_docs = self._rank_documents_by_relevance(unique_docs, query)
        
        return ranked_docs[:k]
    
    def _deduplicate_documents(self, documents: List[Document]) -> List[Document]:
        """Remove duplicate documents based on content"""
        seen_content = set()
        unique_docs = []
        
        for doc in documents:
            content_hash = hash(doc.page_content)
            if content_hash not in seen_content:
                seen_content.add(content_hash)
                unique_docs.append(doc)
        
        return unique_docs
    
    def _rank_documents_by_relevance(self, documents: List[Document], query: str) -> List[Document]:
        """Rank documents by multiple relevance factors"""
        scored_docs = []
        
        for doc in documents:
            score = self._calculate_relevance_score(doc, query)
            scored_docs.append((doc, score))
        
        # Sort by score (descending)
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        return [doc for doc, score in scored_docs]
    
    def _calculate_relevance_score(self, document: Document, query: str) -> float:
        """Calculate relevance score based on multiple factors"""
        content = document.page_content.lower()
        query_terms = query.lower().split()
        
        # Term frequency score
        term_score = sum(content.count(term) for term in query_terms) / len(query_terms)
        
        # Content length score (prefer moderate length)
        length_score = 1.0 - abs(len(content) - 1000) / 2000
        length_score = max(0, length_score)
        
        # Metadata quality score
        metadata_score = 0.5
        if 'source' in document.metadata:
            metadata_score += 0.2
        if 'chunk_id' in document.metadata:
            metadata_score += 0.2
        if 'section_title' in document.metadata:
            metadata_score += 0.1
        
        # Combined score
        total_score = (term_score * 0.5) + (length_score * 0.3) + (metadata_score * 0.2)
        return min(total_score, 1.0)

# Usage
advanced_retriever = AdvancedRetriever(faiss_store.vectorstore, llm)
multi_query_retriever = advanced_retriever.setup_multi_query_retriever()

# Use ensemble retrieval
query = "How to implement a neural network for image classification?"
ensemble_results = advanced_retriever.ensemble_retrieval(query, k=5)

Hierarchical Retrieval ​

python
class HierarchicalRetriever:
    def __init__(self, documents: List[Document], embeddings):
        self.documents = documents
        self.embeddings = embeddings
        self.document_store = None
        self.chunk_store = None
        self._build_hierarchical_stores()
    
    def _build_hierarchical_stores(self):
        """Build document-level and chunk-level vector stores"""
        # Document-level summaries
        document_summaries = self._create_document_summaries()
        self.document_store = FAISS.from_documents(document_summaries, self.embeddings)
        
        # Chunk-level details
        self.chunk_store = FAISS.from_documents(self.documents, self.embeddings)
    
    def _create_document_summaries(self) -> List[Document]:
        """Create summaries for each source document"""
        summaries = []
        document_groups = {}
        
        # Group chunks by source document
        for doc in self.documents:
            source = doc.metadata.get('source', 'unknown')
            if source not in document_groups:
                document_groups[source] = []
            document_groups[source].append(doc)
        
        # Create summary for each document
        for source, chunks in document_groups.items():
            # Combine first few chunks as summary
            summary_content = ' '.join([chunk.page_content for chunk in chunks[:3]])
            
            summary_doc = Document(
                page_content=summary_content,
                metadata={
                    'source': source,
                    'type': 'document_summary',
                    'chunk_count': len(chunks)
                }
            )
            summaries.append(summary_doc)
        
        return summaries
    
    def hierarchical_search(self, query: str, max_docs: int = 2, chunks_per_doc: int = 3) -> List[Document]:
        """Perform hierarchical search: documents first, then chunks"""
        # Step 1: Find relevant documents
        relevant_docs = self.document_store.similarity_search(query, k=max_docs)
        
        # Step 2: For each relevant document, find specific chunks
        all_relevant_chunks = []
        
        for doc_summary in relevant_docs:
            source = doc_summary.metadata['source']
            
            # Filter chunks from this document
            source_chunks = [
                chunk for chunk in self.documents 
                if chunk.metadata.get('source') == source
            ]
            
            # Create temporary vector store for this document's chunks
            if source_chunks:
                doc_chunk_store = FAISS.from_documents(source_chunks, self.embeddings)
                relevant_chunks = doc_chunk_store.similarity_search(query, k=chunks_per_doc)
                all_relevant_chunks.extend(relevant_chunks)
        
        return all_relevant_chunks

# Usage
hierarchical_retriever = HierarchicalRetriever(chunks, openai_embeddings)
hierarchical_results = hierarchical_retriever.hierarchical_search(
    "machine learning model training",
    max_docs=2,
    chunks_per_doc=3
)

πŸ€– RAG Chain Implementation ​

πŸ”— Complete RAG Pipeline ​

python
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

class RAGSystem:
    def __init__(self, llm, vectorstore, retriever_type="similarity"):
        self.llm = llm
        self.vectorstore = vectorstore
        self.retriever = self._setup_retriever(retriever_type)
        self.qa_chain = self._setup_qa_chain()
    
    def _setup_retriever(self, retriever_type: str):
        """Setup retriever based on type"""
        if retriever_type == "similarity":
            return self.vectorstore.as_retriever(
                search_type="similarity",
                search_kwargs={"k": 4}
            )
        elif retriever_type == "mmr":
            return self.vectorstore.as_retriever(
                search_type="mmr",
                search_kwargs={"k": 4, "fetch_k": 8}
            )
        elif retriever_type == "threshold":
            return self.vectorstore.as_retriever(
                search_type="similarity_score_threshold",
                search_kwargs={"score_threshold": 0.5, "k": 4}
            )
    
    def _setup_qa_chain(self):
        """Setup Question-Answering chain with custom prompt"""
        prompt_template = """
        Use the following context to answer the question. If the answer is not in the context, say "I don't have enough information to answer this question."

        Context:
        {context}

        Question: {question}

        Instructions:
        1. Base your answer primarily on the provided context
        2. If you use information from multiple sources, mention this
        3. If the context is insufficient, clearly state this
        4. Provide specific examples from the context when relevant
        5. Keep your answer concise but comprehensive

        Answer:
        """
        
        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )
        
        return RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.retriever,
            chain_type_kwargs={"prompt": PROMPT},
            return_source_documents=True
        )
    
    def query(self, question: str) -> dict:
        """Query the RAG system"""
        result = self.qa_chain({"query": question})
        
        return {
            "answer": result["result"],
            "source_documents": result["source_documents"],
            "sources": [doc.metadata.get("source", "Unknown") for doc in result["source_documents"]]
        }
    
    def query_with_chat_history(self, question: str, chat_history: List[tuple] = None) -> dict:
        """Query with conversation history context"""
        if chat_history:
            # Add chat history to context
            history_context = "\n".join([
                f"Human: {human}\nAssistant: {ai}" 
                for human, ai in chat_history[-3:]  # Last 3 exchanges
            ])
            
            enhanced_question = f"""
            Previous conversation:
            {history_context}
            
            Current question: {question}
            """
        else:
            enhanced_question = question
        
        return self.query(enhanced_question)
    
    def get_system_stats(self) -> dict:
        """Get RAG system statistics"""
        return {
            "total_documents": len(self.vectorstore.docstore.search("*")),
            "retriever_type": type(self.retriever).__name__,
            "llm_model": getattr(self.llm, 'model_name', 'Unknown')
        }

# Usage
rag_system = RAGSystem(llm, faiss_store.vectorstore, retriever_type="mmr")

# Simple query
result = rag_system.query("How do I implement a convolutional neural network?")
print("Answer:", result["answer"])
print("Sources:", result["sources"])

# Query with chat history
chat_history = [
    ("What is machine learning?", "Machine learning is a subset of AI that enables computers to learn from data."),
    ("What are the main types?", "The main types are supervised, unsupervised, and reinforcement learning.")
]

contextual_result = rag_system.query_with_chat_history(
    "Can you give me an example of supervised learning?",
    chat_history
)

🎯 Advanced RAG Patterns ​

Self-Querying RAG ​

python
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo

class SelfQueryingRAG:
    def __init__(self, llm, vectorstore):
        self.llm = llm
        self.vectorstore = vectorstore
        self.metadata_field_info = self._define_metadata_fields()
        self.document_content_description = "Documentation about machine learning, programming, and AI"
        
    def _define_metadata_fields(self) -> List[AttributeInfo]:
        """Define metadata fields for self-querying"""
        return [
            AttributeInfo(
                name="source",
                description="The source file of the document",
                type="string"
            ),
            AttributeInfo(
                name="file_type",
                description="The type of file (pdf, txt, etc.)",
                type="string"
            ),
            AttributeInfo(
                name="section_title",
                description="The title of the document section",
                type="string"
            ),
            AttributeInfo(
                name="word_count",
                description="The number of words in the document chunk",
                type="integer"
            ),
            AttributeInfo(
                name="language",
                description="Whether the content is 'code', 'natural_language', or 'mixed'",
                type="string"
            )
        ]
    
    def setup_self_query_retriever(self):
        """Setup self-querying retriever"""
        return SelfQueryRetriever.from_llm(
            self.llm,
            self.vectorstore,
            self.document_content_description,
            self.metadata_field_info,
            verbose=True
        )
    
    def query_with_filters(self, question: str) -> List[Document]:
        """Query with automatic filter generation"""
        retriever = self.setup_self_query_retriever()
        return retriever.get_relevant_documents(question)

# Usage
self_querying_rag = SelfQueryingRAG(llm, chroma_store.vectorstore)

# The system will automatically generate filters based on the query
results = self_querying_rag.query_with_filters(
    "Show me code examples about neural networks from PDF documents"
)

Multi-Modal RAG ​

python
class MultiModalRAG:
    def __init__(self, llm, text_vectorstore, image_descriptions: dict = None):
        self.llm = llm
        self.text_vectorstore = text_vectorstore
        self.image_descriptions = image_descriptions or {}
    
    def add_image_descriptions(self, image_path: str, description: str):
        """Add description for an image"""
        self.image_descriptions[image_path] = description
    
    def search_mixed_content(self, query: str, include_images: bool = True) -> dict:
        """Search both text and image content"""
        results = {
            "text_results": [],
            "image_results": [],
            "combined_context": ""
        }
        
        # Search text content
        text_docs = self.text_vectorstore.similarity_search(query, k=3)
        results["text_results"] = text_docs
        
        # Search image descriptions if requested
        if include_images and self.image_descriptions:
            image_results = []
            query_lower = query.lower()
            
            for image_path, description in self.image_descriptions.items():
                if any(term in description.lower() for term in query_lower.split()):
                    image_results.append({
                        "image_path": image_path,
                        "description": description
                    })
            
            results["image_results"] = image_results
        
        # Combine context
        text_context = "\n\n".join([doc.page_content for doc in text_docs])
        image_context = "\n\n".join([
            f"Image: {img['image_path']}\nDescription: {img['description']}"
            for img in results["image_results"]
        ])
        
        results["combined_context"] = f"{text_context}\n\n{image_context}"
        
        return results
    
    def generate_answer_with_mixed_content(self, query: str) -> str:
        """Generate answer using both text and image content"""
        mixed_results = self.search_mixed_content(query)
        
        prompt = f"""
        Answer the following question using the provided text and image information:
        
        Question: {query}
        
        Text Content:
        {mixed_results['combined_context']}
        
        Provide a comprehensive answer that references both text and visual information when relevant.
        """
        
        return self.llm.predict(prompt)

# Usage
multimodal_rag = MultiModalRAG(llm, faiss_store.vectorstore)

# Add image descriptions
multimodal_rag.add_image_descriptions(
    "neural_network_diagram.png",
    "Diagram showing a neural network with input layer, hidden layers, and output layer"
)

# Search with mixed content
mixed_answer = multimodal_rag.generate_answer_with_mixed_content(
    "How does information flow through a neural network?"
)

πŸš€ Getting Started with RAG ​

πŸ“š Quick Implementation ​

python
# Complete RAG setup in minimal code
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA

# 1. Load documents
loader = TextLoader("your_document.txt")
documents = loader.load()

# 2. Split into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = text_splitter.split_documents(documents)

# 3. Create embeddings and vector store
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(chunks, embeddings)

# 4. Setup LLM and RAG chain
llm = OpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever()
)

# 5. Query the system
answer = qa_chain.run("Your question here")
print(answer)

🎯 Next Steps ​

  1. Experiment with Chunking: Try different splitting strategies for your documents
  2. Optimize Retrieval: Test various retrieval methods and parameters
  3. Custom Prompts: Create domain-specific prompt templates
  4. Add Metadata: Enhance documents with rich metadata for better filtering
  5. Monitor Performance: Track retrieval quality and response accuracy

πŸ“– Additional Resources ​

  • Vector Database Comparison: Choosing the right vector store
  • Embedding Model Benchmarks: Performance and cost analysis
  • Production RAG Patterns: Scalable architectures and best practices
  • RAG Evaluation Methods: Measuring retrieval and generation quality

Master RAG to build intelligent AI systems that can access and utilize vast knowledge bases to provide accurate, contextual, and up-to-date information.

Released under the MIT License.