Retrieval Augmented Generation (RAG) - Knowledge-Enhanced AI β
Build AI systems that combine the power of language models with external knowledge sources
π What is Retrieval Augmented Generation (RAG)? β
RAG (Retrieval Augmented Generation) is a powerful technique that enhances language models by providing them with relevant external information during the generation process. Instead of relying solely on training data, RAG systems can access and utilize up-to-date, domain-specific knowledge from external sources.
Simple Analogy: Think of RAG as giving an AI assistant access to a vast library - instead of only answering from memory, it can quickly look up relevant books, articles, and documents to provide more accurate, current, and detailed responses.
π― RAG Architecture Overview β
π RAG SYSTEM ARCHITECTURE π
(Knowledge-Enhanced Generation)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π KNOWLEDGE BASE β
β "External Information" β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π― RAG PIPELINE β
ββββ¬βββββββββ¬βββββββββ¬βββββββββ¬βββββββββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββ¬ββββββββββ¬ββββββββββ¬ββββββββββ
βπ₯ LOAD ββοΈ SPLIT βπ’ EMBED βπΎ STORE β
βDOCUMENTSβ TEXT β CHUNKS βVECTORS β
β β β β β
βPDFs βSmart βSemantic βVector β
βWebsites βChunking βVectors βDatabase β
βAPIs βContext βSearch βIndex β
ββββββ¬βββββ΄βββββ¬βββββ΄βββββ¬βββββ΄βββββ¬βββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π RETRIEVAL PROCESS β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π¬ GENERATION PIPELINE β
ββββ¬βββββββββ¬βββββββββ¬βββββββββ¬βββββββββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββ¬ββββββββββ¬ββββββββββ¬ββββββββββ
ββ QUERY βπ SEARCHβπ PROMPTβπ€ ANSWERβ
βUSER βRELEVANT βASSEMBLY βWITH β
βQUESTION βDOCS βCONTEXT βCONTEXT β
βββββββββββ΄ββββββββββ΄ββββββββββ΄ββββββββββπ Document Loading & Processing β
π₯ Multi-Format Document Loaders β
Loading documents from various sources and formats
Basic Document Loading β
from langchain.document_loaders import (
PyPDFLoader,
TextLoader,
WebBaseLoader,
CSVLoader,
JSONLoader
)
# PDF Document Loading
pdf_loader = PyPDFLoader("path/to/document.pdf")
pdf_documents = pdf_loader.load()
# Web Content Loading
web_loader = WebBaseLoader("https://example.com/article")
web_documents = web_loader.load()
# Text File Loading
text_loader = TextLoader("path/to/file.txt")
text_documents = text_loader.load()
# CSV Data Loading
csv_loader = CSVLoader("path/to/data.csv")
csv_documents = csv_loader.load()
print(f"Loaded {len(pdf_documents)} PDF pages")
print(f"First document preview: {pdf_documents[0].page_content[:200]}...")Advanced Document Loading with Metadata β
from langchain.document_loaders import DirectoryLoader
from langchain.schema import Document
import os
from datetime import datetime
class EnhancedDocumentLoader:
def __init__(self, base_directory: str):
self.base_directory = base_directory
self.supported_extensions = {
'.pdf': PyPDFLoader,
'.txt': TextLoader,
'.csv': CSVLoader,
'.json': JSONLoader
}
def load_documents_with_metadata(self) -> List[Document]:
"""Load all documents with enhanced metadata"""
documents = []
for root, dirs, files in os.walk(self.base_directory):
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1].lower()
if file_ext in self.supported_extensions:
try:
loader_class = self.supported_extensions[file_ext]
loader = loader_class(file_path)
file_docs = loader.load()
# Add enhanced metadata
for doc in file_docs:
doc.metadata.update({
'file_path': file_path,
'file_name': file,
'file_type': file_ext,
'file_size': os.path.getsize(file_path),
'created_date': datetime.fromtimestamp(os.path.getctime(file_path)),
'modified_date': datetime.fromtimestamp(os.path.getmtime(file_path)),
'directory': os.path.dirname(file_path)
})
documents.extend(file_docs)
except Exception as e:
print(f"Error loading {file_path}: {e}")
return documents
def filter_documents_by_criteria(self, documents: List[Document], criteria: dict) -> List[Document]:
"""Filter documents based on metadata criteria"""
filtered_docs = []
for doc in documents:
meets_criteria = True
for key, value in criteria.items():
if key not in doc.metadata:
meets_criteria = False
break
if isinstance(value, str) and value.lower() not in str(doc.metadata[key]).lower():
meets_criteria = False
break
elif isinstance(value, (int, float)) and doc.metadata[key] != value:
meets_criteria = False
break
if meets_criteria:
filtered_docs.append(doc)
return filtered_docs
# Usage
enhanced_loader = EnhancedDocumentLoader("./documents")
all_documents = enhanced_loader.load_documents_with_metadata()
# Filter for recent PDF documents
recent_pdfs = enhanced_loader.filter_documents_by_criteria(
all_documents,
{'file_type': '.pdf', 'file_size': 1000} # PDFs larger than 1KB
)Web Scraping with Custom Processing β
from langchain.document_loaders import AsyncHtmlLoader
from langchain.document_transformers import Html2TextTransformer
import asyncio
class WebContentLoader:
def __init__(self):
self.html_transformer = Html2TextTransformer()
async def load_multiple_urls(self, urls: List[str]) -> List[Document]:
"""Load content from multiple URLs asynchronously"""
loader = AsyncHtmlLoader(urls)
html_documents = await loader.aload()
# Transform HTML to clean text
text_documents = self.html_transformer.transform_documents(html_documents)
# Add web-specific metadata
for i, doc in enumerate(text_documents):
doc.metadata.update({
'source_url': urls[i],
'content_type': 'web',
'scraped_date': datetime.now(),
'word_count': len(doc.page_content.split())
})
return text_documents
def clean_web_content(self, documents: List[Document]) -> List[Document]:
"""Clean and process web content"""
cleaned_docs = []
for doc in documents:
# Remove extra whitespace and newlines
cleaned_content = ' '.join(doc.page_content.split())
# Remove common web artifacts
artifacts = ['Cookie Policy', 'Privacy Policy', 'Subscribe to Newsletter']
for artifact in artifacts:
cleaned_content = cleaned_content.replace(artifact, '')
# Create new document with cleaned content
cleaned_doc = Document(
page_content=cleaned_content,
metadata=doc.metadata
)
cleaned_docs.append(cleaned_doc)
return cleaned_docs
# Usage
web_loader = WebContentLoader()
urls = [
"https://example.com/article1",
"https://example.com/article2",
"https://example.com/article3"
]
# Load and clean web content
web_docs = await web_loader.load_multiple_urls(urls)
cleaned_docs = web_loader.clean_web_content(web_docs)βοΈ Text Splitting & Chunking β
π― Smart Chunking Strategies β
Optimizing document chunks for effective retrieval
Basic Text Splitting β
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
CharacterTextSplitter,
TokenTextSplitter
)
# Recursive Character Splitter (Recommended)
recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
# Split documents
chunks = recursive_splitter.split_documents(documents)
print(f"Created {len(chunks)} chunks from {len(documents)} documents")
# Token-based Splitter
token_splitter = TokenTextSplitter(
chunk_size=512,
chunk_overlap=50
)
token_chunks = token_splitter.split_documents(documents)Advanced Semantic Chunking β
from langchain.schema import Document
import re
from typing import List
class SemanticChunker:
def __init__(self, target_chunk_size: int = 1000, overlap_size: int = 200):
self.target_chunk_size = target_chunk_size
self.overlap_size = overlap_size
def chunk_by_sections(self, documents: List[Document]) -> List[Document]:
"""Chunk documents by logical sections (headers, paragraphs)"""
chunked_docs = []
for doc in documents:
sections = self._identify_sections(doc.page_content)
for i, section in enumerate(sections):
if len(section['content']) > self.target_chunk_size:
# Further split large sections
subsections = self._split_large_section(section['content'])
for j, subsection in enumerate(subsections):
chunk_doc = Document(
page_content=subsection,
metadata={
**doc.metadata,
'chunk_id': f"{doc.metadata.get('file_name', 'doc')}_{i}_{j}",
'section_title': section['title'],
'chunk_type': 'subsection'
}
)
chunked_docs.append(chunk_doc)
else:
chunk_doc = Document(
page_content=section['content'],
metadata={
**doc.metadata,
'chunk_id': f"{doc.metadata.get('file_name', 'doc')}_{i}",
'section_title': section['title'],
'chunk_type': 'section'
}
)
chunked_docs.append(chunk_doc)
return chunked_docs
def _identify_sections(self, text: str) -> List[dict]:
"""Identify logical sections in text"""
sections = []
# Split by headers (markdown-style)
header_pattern = r'^#{1,6}\s+(.+)$'
lines = text.split('\n')
current_section = {'title': 'Introduction', 'content': ''}
for line in lines:
header_match = re.match(header_pattern, line)
if header_match:
# Save previous section
if current_section['content'].strip():
sections.append(current_section)
# Start new section
current_section = {
'title': header_match.group(1),
'content': ''
}
else:
current_section['content'] += line + '\n'
# Add final section
if current_section['content'].strip():
sections.append(current_section)
return sections
def _split_large_section(self, content: str) -> List[str]:
"""Split large sections into smaller chunks"""
# Split by paragraphs first
paragraphs = content.split('\n\n')
chunks = []
current_chunk = ""
for paragraph in paragraphs:
if len(current_chunk + paragraph) <= self.target_chunk_size:
current_chunk += paragraph + '\n\n'
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = paragraph + '\n\n'
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def add_contextual_overlap(self, chunks: List[Document]) -> List[Document]:
"""Add contextual overlap between chunks"""
enhanced_chunks = []
for i, chunk in enumerate(chunks):
content = chunk.page_content
# Add context from previous chunk
if i > 0 and chunks[i-1].metadata.get('file_name') == chunk.metadata.get('file_name'):
prev_content = chunks[i-1].page_content
overlap_text = prev_content[-self.overlap_size:] if len(prev_content) > self.overlap_size else prev_content
content = f"[Previous context: ...{overlap_text}]\n\n{content}"
# Add context from next chunk
if i < len(chunks) - 1 and chunks[i+1].metadata.get('file_name') == chunk.metadata.get('file_name'):
next_content = chunks[i+1].page_content
overlap_text = next_content[:self.overlap_size] if len(next_content) > self.overlap_size else next_content
content = f"{content}\n\n[Next context: {overlap_text}...]"
enhanced_chunk = Document(
page_content=content,
metadata={
**chunk.metadata,
'has_context_overlap': True,
'chunk_position': i,
'total_chunks': len(chunks)
}
)
enhanced_chunks.append(enhanced_chunk)
return enhanced_chunks
# Usage
semantic_chunker = SemanticChunker(target_chunk_size=800, overlap_size=150)
semantic_chunks = semantic_chunker.chunk_by_sections(documents)
enhanced_chunks = semantic_chunker.add_contextual_overlap(semantic_chunks)Domain-Specific Chunking β
class CodeDocumentChunker:
"""Specialized chunker for code documentation"""
def __init__(self):
self.code_patterns = {
'function': r'def\s+\w+\([^)]*\):',
'class': r'class\s+\w+.*:',
'import': r'(?:import|from)\s+\w+',
'comment': r'#.*$|""".*?"""',
}
def chunk_code_documentation(self, documents: List[Document]) -> List[Document]:
"""Chunk code documentation preserving logical structure"""
chunked_docs = []
for doc in documents:
if self._is_code_document(doc):
chunks = self._chunk_by_code_structure(doc)
else:
chunks = self._chunk_by_markdown_structure(doc)
chunked_docs.extend(chunks)
return chunked_docs
def _is_code_document(self, doc: Document) -> bool:
"""Determine if document contains code"""
content = doc.page_content
code_indicators = ['def ', 'class ', 'import ', 'function(', '```python', '```javascript']
return any(indicator in content for indicator in code_indicators)
def _chunk_by_code_structure(self, doc: Document) -> List[Document]:
"""Chunk by code structure (functions, classes, etc.)"""
chunks = []
lines = doc.page_content.split('\n')
current_chunk_lines = []
current_function_or_class = None
for line in lines:
# Check for function or class definition
if re.match(r'(def|class)\s+\w+', line.strip()):
# Save previous chunk
if current_chunk_lines:
chunk_content = '\n'.join(current_chunk_lines)
chunks.append(Document(
page_content=chunk_content,
metadata={
**doc.metadata,
'chunk_type': 'code_block',
'function_or_class': current_function_or_class
}
))
# Start new chunk
current_chunk_lines = [line]
current_function_or_class = line.strip()
else:
current_chunk_lines.append(line)
# Add final chunk
if current_chunk_lines:
chunk_content = '\n'.join(current_chunk_lines)
chunks.append(Document(
page_content=chunk_content,
metadata={
**doc.metadata,
'chunk_type': 'code_block',
'function_or_class': current_function_or_class
}
))
return chunks
# Usage
code_chunker = CodeDocumentChunker()
code_chunks = code_chunker.chunk_code_documentation(documents)π’ Vector Embeddings & Storage β
β‘ Embedding Generation β
Converting text chunks into semantic vector representations
Basic Embedding Setup β
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma, Pinecone
import numpy as np
# OpenAI Embeddings (Recommended for quality)
openai_embeddings = OpenAIEmbeddings(
model="text-embedding-ada-002",
openai_api_key="your-api-key"
)
# Open Source Alternative
huggingface_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# Test embedding generation
sample_text = "LangChain is a framework for building applications with LLMs"
embedding_vector = openai_embeddings.embed_query(sample_text)
print(f"Embedding dimension: {len(embedding_vector)}")
print(f"First 5 values: {embedding_vector[:5]}")Custom Embedding Pipeline β
from sentence_transformers import SentenceTransformer
import torch
class CustomEmbeddingGenerator:
def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = None):
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
self.model = SentenceTransformer(model_name)
self.model.to(self.device)
def embed_documents(self, texts: List[str], batch_size: int = 32) -> List[List[float]]:
"""Generate embeddings for multiple documents"""
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch_embeddings = self.model.encode(
batch,
convert_to_tensor=True,
device=self.device,
show_progress_bar=True
)
embeddings.extend(batch_embeddings.cpu().numpy())
return embeddings
def embed_query(self, text: str) -> List[float]:
"""Generate embedding for a single query"""
embedding = self.model.encode(
[text],
convert_to_tensor=True,
device=self.device
)
return embedding.cpu().numpy()[0]
def calculate_similarity(self, embedding1: List[float], embedding2: List[float]) -> float:
"""Calculate cosine similarity between two embeddings"""
vec1 = np.array(embedding1)
vec2 = np.array(embedding2)
dot_product = np.dot(vec1, vec2)
norm_product = np.linalg.norm(vec1) * np.linalg.norm(vec2)
return dot_product / norm_product if norm_product != 0 else 0.0
def get_model_info(self) -> dict:
"""Get information about the embedding model"""
return {
'model_name': self.model._modules['0'].auto_model.name_or_path,
'embedding_dimension': self.model.get_sentence_embedding_dimension(),
'max_sequence_length': self.model.max_seq_length,
'device': str(self.device)
}
# Usage
custom_embedder = CustomEmbeddingGenerator()
model_info = custom_embedder.get_model_info()
print(f"Model info: {model_info}")
# Generate embeddings for document chunks
chunk_texts = [chunk.page_content for chunk in chunks[:10]] # First 10 chunks
chunk_embeddings = custom_embedder.embed_documents(chunk_texts)πΎ Vector Database Integration β
FAISS Vector Store β
from langchain.vectorstores import FAISS
import pickle
class FAISSVectorStore:
def __init__(self, embeddings_model):
self.embeddings = embeddings_model
self.vectorstore = None
def create_vectorstore(self, documents: List[Document]) -> None:
"""Create FAISS vector store from documents"""
print(f"Creating vector store from {len(documents)} documents...")
self.vectorstore = FAISS.from_documents(
documents=documents,
embedding=self.embeddings
)
print("Vector store created successfully!")
def add_documents(self, new_documents: List[Document]) -> None:
"""Add new documents to existing vector store"""
if self.vectorstore is None:
self.create_vectorstore(new_documents)
else:
self.vectorstore.add_documents(new_documents)
def save_vectorstore(self, path: str) -> None:
"""Save vector store to disk"""
if self.vectorstore:
self.vectorstore.save_local(path)
print(f"Vector store saved to {path}")
def load_vectorstore(self, path: str) -> None:
"""Load vector store from disk"""
self.vectorstore = FAISS.load_local(path, self.embeddings)
print(f"Vector store loaded from {path}")
def similarity_search(self, query: str, k: int = 5, score_threshold: float = 0.0) -> List[Document]:
"""Search for similar documents"""
if not self.vectorstore:
raise ValueError("Vector store not initialized")
# Search with score threshold
docs_with_scores = self.vectorstore.similarity_search_with_score(query, k=k)
# Filter by score threshold
filtered_docs = [
doc for doc, score in docs_with_scores
if score >= score_threshold
]
return filtered_docs
def get_relevant_documents_with_metadata(self, query: str, k: int = 5) -> List[dict]:
"""Get relevant documents with similarity scores and metadata"""
if not self.vectorstore:
raise ValueError("Vector store not initialized")
docs_with_scores = self.vectorstore.similarity_search_with_score(query, k=k)
results = []
for doc, score in docs_with_scores:
results.append({
'content': doc.page_content,
'metadata': doc.metadata,
'similarity_score': score,
'chunk_id': doc.metadata.get('chunk_id', 'unknown')
})
return results
# Usage
faiss_store = FAISSVectorStore(openai_embeddings)
faiss_store.create_vectorstore(chunks)
# Search for relevant documents
query = "How to implement machine learning models?"
relevant_docs = faiss_store.similarity_search(query, k=3)
for i, doc in enumerate(relevant_docs):
print(f"Document {i+1}:")
print(f"Content: {doc.page_content[:200]}...")
print(f"Source: {doc.metadata.get('source', 'Unknown')}")
print("---")Chroma Vector Database β
from langchain.vectorstores import Chroma
import chromadb
class ChromaVectorStore:
def __init__(self, embeddings_model, collection_name: str = "langchain_collection"):
self.embeddings = embeddings_model
self.collection_name = collection_name
self.vectorstore = None
def setup_chroma(self, persist_directory: str = "./chroma_db"):
"""Setup Chroma with persistence"""
self.vectorstore = Chroma(
collection_name=self.collection_name,
embedding_function=self.embeddings,
persist_directory=persist_directory
)
def add_documents_with_metadata_filtering(self, documents: List[Document]):
"""Add documents with enhanced metadata for filtering"""
enhanced_docs = []
for doc in documents:
# Add searchable metadata
enhanced_metadata = {
**doc.metadata,
'content_length': len(doc.page_content),
'word_count': len(doc.page_content.split()),
'has_code': 'def ' in doc.page_content or 'class ' in doc.page_content,
'language': self._detect_language(doc.page_content)
}
enhanced_doc = Document(
page_content=doc.page_content,
metadata=enhanced_metadata
)
enhanced_docs.append(enhanced_doc)
self.vectorstore.add_documents(enhanced_docs)
def _detect_language(self, text: str) -> str:
"""Simple language detection"""
code_keywords = ['def ', 'class ', 'import ', 'function', 'var ', 'const ']
if any(keyword in text for keyword in code_keywords):
return 'code'
elif len(text.split()) > 10:
return 'natural_language'
else:
return 'mixed'
def search_with_filters(self, query: str, filter_criteria: dict = None, k: int = 5) -> List[Document]:
"""Search with metadata filters"""
search_kwargs = {'k': k}
if filter_criteria:
search_kwargs['filter'] = filter_criteria
return self.vectorstore.search(query, search_type="similarity", **search_kwargs)
def get_collection_stats(self) -> dict:
"""Get statistics about the collection"""
if not self.vectorstore:
return {"error": "Vector store not initialized"}
# Get collection info
collection = self.vectorstore._collection
return {
'total_documents': collection.count(),
'collection_name': self.collection_name
}
# Usage
chroma_store = ChromaVectorStore(openai_embeddings)
chroma_store.setup_chroma("./my_chroma_db")
chroma_store.add_documents_with_metadata_filtering(chunks)
# Search with filters
filtered_results = chroma_store.search_with_filters(
query="machine learning implementation",
filter_criteria={"language": "natural_language"},
k=3
)π Retrieval Strategies β
π― Advanced Retrieval Methods β
Multi-Query Retrieval β
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.llms import OpenAI
class AdvancedRetriever:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def setup_multi_query_retriever(self) -> MultiQueryRetriever:
"""Setup retriever that generates multiple query variations"""
retriever = MultiQueryRetriever.from_llm(
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}),
llm=self.llm
)
return retriever
def contextual_compression_retrieval(self, query: str) -> List[Document]:
"""Retrieve and compress results for relevance"""
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# Setup compression
compressor = LLMChainExtractor.from_llm(self.llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=self.vectorstore.as_retriever()
)
return compression_retriever.get_relevant_documents(query)
def ensemble_retrieval(self, query: str, k: int = 5) -> List[Document]:
"""Combine multiple retrieval strategies"""
# Strategy 1: Similarity search
similarity_docs = self.vectorstore.similarity_search(query, k=k)
# Strategy 2: MMR (Maximum Marginal Relevance)
mmr_docs = self.vectorstore.max_marginal_relevance_search(query, k=k)
# Strategy 3: Similarity search with score threshold
threshold_docs = self.vectorstore.similarity_search_with_score(query, k=k)
threshold_docs = [doc for doc, score in threshold_docs if score > 0.7]
# Combine and deduplicate results
all_docs = similarity_docs + mmr_docs + threshold_docs
unique_docs = self._deduplicate_documents(all_docs)
# Rank by relevance
ranked_docs = self._rank_documents_by_relevance(unique_docs, query)
return ranked_docs[:k]
def _deduplicate_documents(self, documents: List[Document]) -> List[Document]:
"""Remove duplicate documents based on content"""
seen_content = set()
unique_docs = []
for doc in documents:
content_hash = hash(doc.page_content)
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_docs.append(doc)
return unique_docs
def _rank_documents_by_relevance(self, documents: List[Document], query: str) -> List[Document]:
"""Rank documents by multiple relevance factors"""
scored_docs = []
for doc in documents:
score = self._calculate_relevance_score(doc, query)
scored_docs.append((doc, score))
# Sort by score (descending)
scored_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, score in scored_docs]
def _calculate_relevance_score(self, document: Document, query: str) -> float:
"""Calculate relevance score based on multiple factors"""
content = document.page_content.lower()
query_terms = query.lower().split()
# Term frequency score
term_score = sum(content.count(term) for term in query_terms) / len(query_terms)
# Content length score (prefer moderate length)
length_score = 1.0 - abs(len(content) - 1000) / 2000
length_score = max(0, length_score)
# Metadata quality score
metadata_score = 0.5
if 'source' in document.metadata:
metadata_score += 0.2
if 'chunk_id' in document.metadata:
metadata_score += 0.2
if 'section_title' in document.metadata:
metadata_score += 0.1
# Combined score
total_score = (term_score * 0.5) + (length_score * 0.3) + (metadata_score * 0.2)
return min(total_score, 1.0)
# Usage
advanced_retriever = AdvancedRetriever(faiss_store.vectorstore, llm)
multi_query_retriever = advanced_retriever.setup_multi_query_retriever()
# Use ensemble retrieval
query = "How to implement a neural network for image classification?"
ensemble_results = advanced_retriever.ensemble_retrieval(query, k=5)Hierarchical Retrieval β
class HierarchicalRetriever:
def __init__(self, documents: List[Document], embeddings):
self.documents = documents
self.embeddings = embeddings
self.document_store = None
self.chunk_store = None
self._build_hierarchical_stores()
def _build_hierarchical_stores(self):
"""Build document-level and chunk-level vector stores"""
# Document-level summaries
document_summaries = self._create_document_summaries()
self.document_store = FAISS.from_documents(document_summaries, self.embeddings)
# Chunk-level details
self.chunk_store = FAISS.from_documents(self.documents, self.embeddings)
def _create_document_summaries(self) -> List[Document]:
"""Create summaries for each source document"""
summaries = []
document_groups = {}
# Group chunks by source document
for doc in self.documents:
source = doc.metadata.get('source', 'unknown')
if source not in document_groups:
document_groups[source] = []
document_groups[source].append(doc)
# Create summary for each document
for source, chunks in document_groups.items():
# Combine first few chunks as summary
summary_content = ' '.join([chunk.page_content for chunk in chunks[:3]])
summary_doc = Document(
page_content=summary_content,
metadata={
'source': source,
'type': 'document_summary',
'chunk_count': len(chunks)
}
)
summaries.append(summary_doc)
return summaries
def hierarchical_search(self, query: str, max_docs: int = 2, chunks_per_doc: int = 3) -> List[Document]:
"""Perform hierarchical search: documents first, then chunks"""
# Step 1: Find relevant documents
relevant_docs = self.document_store.similarity_search(query, k=max_docs)
# Step 2: For each relevant document, find specific chunks
all_relevant_chunks = []
for doc_summary in relevant_docs:
source = doc_summary.metadata['source']
# Filter chunks from this document
source_chunks = [
chunk for chunk in self.documents
if chunk.metadata.get('source') == source
]
# Create temporary vector store for this document's chunks
if source_chunks:
doc_chunk_store = FAISS.from_documents(source_chunks, self.embeddings)
relevant_chunks = doc_chunk_store.similarity_search(query, k=chunks_per_doc)
all_relevant_chunks.extend(relevant_chunks)
return all_relevant_chunks
# Usage
hierarchical_retriever = HierarchicalRetriever(chunks, openai_embeddings)
hierarchical_results = hierarchical_retriever.hierarchical_search(
"machine learning model training",
max_docs=2,
chunks_per_doc=3
)π€ RAG Chain Implementation β
π Complete RAG Pipeline β
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
class RAGSystem:
def __init__(self, llm, vectorstore, retriever_type="similarity"):
self.llm = llm
self.vectorstore = vectorstore
self.retriever = self._setup_retriever(retriever_type)
self.qa_chain = self._setup_qa_chain()
def _setup_retriever(self, retriever_type: str):
"""Setup retriever based on type"""
if retriever_type == "similarity":
return self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)
elif retriever_type == "mmr":
return self.vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 4, "fetch_k": 8}
)
elif retriever_type == "threshold":
return self.vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.5, "k": 4}
)
def _setup_qa_chain(self):
"""Setup Question-Answering chain with custom prompt"""
prompt_template = """
Use the following context to answer the question. If the answer is not in the context, say "I don't have enough information to answer this question."
Context:
{context}
Question: {question}
Instructions:
1. Base your answer primarily on the provided context
2. If you use information from multiple sources, mention this
3. If the context is insufficient, clearly state this
4. Provide specific examples from the context when relevant
5. Keep your answer concise but comprehensive
Answer:
"""
PROMPT = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
return RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.retriever,
chain_type_kwargs={"prompt": PROMPT},
return_source_documents=True
)
def query(self, question: str) -> dict:
"""Query the RAG system"""
result = self.qa_chain({"query": question})
return {
"answer": result["result"],
"source_documents": result["source_documents"],
"sources": [doc.metadata.get("source", "Unknown") for doc in result["source_documents"]]
}
def query_with_chat_history(self, question: str, chat_history: List[tuple] = None) -> dict:
"""Query with conversation history context"""
if chat_history:
# Add chat history to context
history_context = "\n".join([
f"Human: {human}\nAssistant: {ai}"
for human, ai in chat_history[-3:] # Last 3 exchanges
])
enhanced_question = f"""
Previous conversation:
{history_context}
Current question: {question}
"""
else:
enhanced_question = question
return self.query(enhanced_question)
def get_system_stats(self) -> dict:
"""Get RAG system statistics"""
return {
"total_documents": len(self.vectorstore.docstore.search("*")),
"retriever_type": type(self.retriever).__name__,
"llm_model": getattr(self.llm, 'model_name', 'Unknown')
}
# Usage
rag_system = RAGSystem(llm, faiss_store.vectorstore, retriever_type="mmr")
# Simple query
result = rag_system.query("How do I implement a convolutional neural network?")
print("Answer:", result["answer"])
print("Sources:", result["sources"])
# Query with chat history
chat_history = [
("What is machine learning?", "Machine learning is a subset of AI that enables computers to learn from data."),
("What are the main types?", "The main types are supervised, unsupervised, and reinforcement learning.")
]
contextual_result = rag_system.query_with_chat_history(
"Can you give me an example of supervised learning?",
chat_history
)π― Advanced RAG Patterns β
Self-Querying RAG β
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
class SelfQueryingRAG:
def __init__(self, llm, vectorstore):
self.llm = llm
self.vectorstore = vectorstore
self.metadata_field_info = self._define_metadata_fields()
self.document_content_description = "Documentation about machine learning, programming, and AI"
def _define_metadata_fields(self) -> List[AttributeInfo]:
"""Define metadata fields for self-querying"""
return [
AttributeInfo(
name="source",
description="The source file of the document",
type="string"
),
AttributeInfo(
name="file_type",
description="The type of file (pdf, txt, etc.)",
type="string"
),
AttributeInfo(
name="section_title",
description="The title of the document section",
type="string"
),
AttributeInfo(
name="word_count",
description="The number of words in the document chunk",
type="integer"
),
AttributeInfo(
name="language",
description="Whether the content is 'code', 'natural_language', or 'mixed'",
type="string"
)
]
def setup_self_query_retriever(self):
"""Setup self-querying retriever"""
return SelfQueryRetriever.from_llm(
self.llm,
self.vectorstore,
self.document_content_description,
self.metadata_field_info,
verbose=True
)
def query_with_filters(self, question: str) -> List[Document]:
"""Query with automatic filter generation"""
retriever = self.setup_self_query_retriever()
return retriever.get_relevant_documents(question)
# Usage
self_querying_rag = SelfQueryingRAG(llm, chroma_store.vectorstore)
# The system will automatically generate filters based on the query
results = self_querying_rag.query_with_filters(
"Show me code examples about neural networks from PDF documents"
)Multi-Modal RAG β
class MultiModalRAG:
def __init__(self, llm, text_vectorstore, image_descriptions: dict = None):
self.llm = llm
self.text_vectorstore = text_vectorstore
self.image_descriptions = image_descriptions or {}
def add_image_descriptions(self, image_path: str, description: str):
"""Add description for an image"""
self.image_descriptions[image_path] = description
def search_mixed_content(self, query: str, include_images: bool = True) -> dict:
"""Search both text and image content"""
results = {
"text_results": [],
"image_results": [],
"combined_context": ""
}
# Search text content
text_docs = self.text_vectorstore.similarity_search(query, k=3)
results["text_results"] = text_docs
# Search image descriptions if requested
if include_images and self.image_descriptions:
image_results = []
query_lower = query.lower()
for image_path, description in self.image_descriptions.items():
if any(term in description.lower() for term in query_lower.split()):
image_results.append({
"image_path": image_path,
"description": description
})
results["image_results"] = image_results
# Combine context
text_context = "\n\n".join([doc.page_content for doc in text_docs])
image_context = "\n\n".join([
f"Image: {img['image_path']}\nDescription: {img['description']}"
for img in results["image_results"]
])
results["combined_context"] = f"{text_context}\n\n{image_context}"
return results
def generate_answer_with_mixed_content(self, query: str) -> str:
"""Generate answer using both text and image content"""
mixed_results = self.search_mixed_content(query)
prompt = f"""
Answer the following question using the provided text and image information:
Question: {query}
Text Content:
{mixed_results['combined_context']}
Provide a comprehensive answer that references both text and visual information when relevant.
"""
return self.llm.predict(prompt)
# Usage
multimodal_rag = MultiModalRAG(llm, faiss_store.vectorstore)
# Add image descriptions
multimodal_rag.add_image_descriptions(
"neural_network_diagram.png",
"Diagram showing a neural network with input layer, hidden layers, and output layer"
)
# Search with mixed content
mixed_answer = multimodal_rag.generate_answer_with_mixed_content(
"How does information flow through a neural network?"
)π Getting Started with RAG β
π Quick Implementation β
# Complete RAG setup in minimal code
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
# 1. Load documents
loader = TextLoader("your_document.txt")
documents = loader.load()
# 2. Split into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = text_splitter.split_documents(documents)
# 3. Create embeddings and vector store
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(chunks, embeddings)
# 4. Setup LLM and RAG chain
llm = OpenAI(temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever()
)
# 5. Query the system
answer = qa_chain.run("Your question here")
print(answer)π― Next Steps β
- Experiment with Chunking: Try different splitting strategies for your documents
- Optimize Retrieval: Test various retrieval methods and parameters
- Custom Prompts: Create domain-specific prompt templates
- Add Metadata: Enhance documents with rich metadata for better filtering
- Monitor Performance: Track retrieval quality and response accuracy
π Additional Resources β
- Vector Database Comparison: Choosing the right vector store
- Embedding Model Benchmarks: Performance and cost analysis
- Production RAG Patterns: Scalable architectures and best practices
- RAG Evaluation Methods: Measuring retrieval and generation quality
Master RAG to build intelligent AI systems that can access and utilize vast knowledge bases to provide accurate, contextual, and up-to-date information.