Retrieval Basics - Building Knowledge-Aware AI Systems β
Learn to integrate external knowledge sources with LLMs through document retrieval, vector search, and RAG (Retrieval-Augmented Generation) patterns
π Understanding Information Retrieval β
Information retrieval in LangChain enables AI systems to access and utilize external knowledge sources, making responses more accurate, current, and contextually relevant than relying solely on pre-trained model knowledge.
π§ The Knowledge Problem β
text
π§ THE KNOWLEDGE LIMITATION PROBLEM π§
(Why retrieval matters)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LLM LIMITATIONS β
β β
β π
Training Cutoff: Knowledge frozen at training time β
β π§ Hallucination: May generate plausible but false info β
β π Context Size: Limited input context window β
β π― Domain Gaps: Missing specialized knowledge β
β π Static Memory: Can't learn from new information β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ RETRIEVAL SOLUTION
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RETRIEVAL-AUGMENTED GENERATION β
β β
β π Dynamic Knowledge: Access current information β
β π― Accurate Sources: Ground responses in real documents β
β π Scalable Context: Process large knowledge bases β
β π Updatable: Add new information without retraining β
β β
Verifiable: Cite sources for transparency β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββποΈ Core Retrieval Components β
π Document Loaders β
python
from langchain_community.document_loaders import (
TextLoader, PyPDFLoader, WebBaseLoader,
DirectoryLoader, CSVLoader, JSONLoader
)
from langchain_core.documents import Document
# Text file loading
text_loader = TextLoader("data/documents/guide.txt")
text_docs = text_loader.load()
print(f"Loaded {len(text_docs)} documents")
print(f"First doc content: {text_docs[0].page_content[:200]}...")
print(f"Metadata: {text_docs[0].metadata}")
# PDF loading with metadata
pdf_loader = PyPDFLoader("data/reports/annual_report.pdf")
pdf_docs = pdf_loader.load()
for i, doc in enumerate(pdf_docs[:3]):
print(f"Page {i+1}: {doc.page_content[:100]}...")
print(f"Page metadata: {doc.metadata}")
# Web scraping
web_loader = WebBaseLoader([
"https://python.langchain.com/docs/introduction/",
"https://python.langchain.com/docs/get_started/quickstart"
])
web_docs = web_loader.load()
print(f"Loaded {len(web_docs)} web pages")
# Directory loading with filtering
directory_loader = DirectoryLoader(
"data/knowledge_base/",
glob="**/*.md", # Only markdown files
loader_cls=TextLoader,
show_progress=True
)
directory_docs = directory_loader.load()
# CSV loading with custom processing
csv_loader = CSVLoader(
file_path="data/faq.csv",
csv_args={
'delimiter': ',',
'quotechar': '"',
'fieldnames': ['question', 'answer', 'category']
}
)
csv_docs = csv_loader.load()
# JSON loading with custom parsing
json_loader = JSONLoader(
file_path="data/products.json",
jq_schema='.products[]', # Extract products array
text_content=False
)
json_docs = json_loader.load()
# Custom document creation
def create_custom_documents():
"""Create documents from custom data sources"""
custom_docs = []
# Example: Convert API responses to documents
api_data = [
{"title": "Python Basics", "content": "Python is...", "author": "John"},
{"title": "AI Fundamentals", "content": "AI involves...", "author": "Jane"}
]
for item in api_data:
doc = Document(
page_content=item["content"],
metadata={
"title": item["title"],
"author": item["author"],
"source": "api",
"timestamp": "2024-01-01"
}
)
custom_docs.append(doc)
return custom_docs
custom_docs = create_custom_documents()
print(f"Created {len(custom_docs)} custom documents")βοΈ Text Splitters β
python
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
CharacterTextSplitter,
TokenTextSplitter,
SpacyTextSplitter
)
# Recursive character splitter (most common)
recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Target chunk size
chunk_overlap=200, # Overlap between chunks
length_function=len, # How to measure length
separators=["\n\n", "\n", " ", ""] # Split hierarchy
)
# Split documents
sample_text = """
This is a long document that needs to be split into smaller chunks for processing.
It contains multiple paragraphs and sections.
Each section covers different topics that should ideally stay together when possible.
The splitter will try to maintain semantic coherence while respecting size limits.
This helps maintain context while making the text manageable for embedding and retrieval.
"""
chunks = recursive_splitter.split_text(sample_text)
print(f"Split into {len(chunks)} chunks:")
for i, chunk in enumerate(chunks):
print(f"Chunk {i+1}: {chunk[:100]}...")
# Split loaded documents
split_docs = recursive_splitter.split_documents(text_docs)
print(f"Original docs: {len(text_docs)}, Split docs: {len(split_docs)}")
# Token-based splitter (for token limits)
token_splitter = TokenTextSplitter(
chunk_size=500, # 500 tokens per chunk
chunk_overlap=50 # 50 token overlap
)
token_chunks = token_splitter.split_text(sample_text)
print(f"Token-based chunks: {len(token_chunks)}")
# Code-aware splitter
from langchain.text_splitter import PythonCodeTextSplitter
code_text = '''
def fibonacci(n):
"""Calculate fibonacci number"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
class MathUtils:
"""Utility class for math operations"""
@staticmethod
def factorial(n):
if n <= 1:
return 1
return n * MathUtils.factorial(n-1)
def gcd(self, a, b):
while b:
a, b = b, a % b
return a
'''
python_splitter = PythonCodeTextSplitter(chunk_size=200, chunk_overlap=0)
code_chunks = python_splitter.split_text(code_text)
print(f"Code chunks: {len(code_chunks)}")
for i, chunk in enumerate(code_chunks):
print(f"Code chunk {i+1}:\n{chunk}\n---")
# Custom semantic splitter
class SemanticTextSplitter:
"""Split text based on semantic boundaries"""
def __init__(self, chunk_size=1000, overlap=100):
self.chunk_size = chunk_size
self.overlap = overlap
def split_text(self, text: str) -> list[str]:
"""Split text at sentence boundaries when possible"""
import re
# Split into sentences
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
current_chunk = ""
for sentence in sentences:
# Check if adding sentence exceeds chunk size
if len(current_chunk + sentence) > self.chunk_size and current_chunk:
chunks.append(current_chunk.strip())
# Start new chunk with overlap
overlap_text = current_chunk[-self.overlap:] if len(current_chunk) > self.overlap else current_chunk
current_chunk = overlap_text + " " + sentence
else:
current_chunk += " " + sentence if current_chunk else sentence
# Add final chunk
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
semantic_splitter = SemanticTextSplitter(chunk_size=200, overlap=50)
semantic_chunks = semantic_splitter.split_text(sample_text)
print(f"Semantic chunks: {len(semantic_chunks)}")π Vector Stores and Embeddings β
python
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, FAISS
from langchain_core.vectorstores import VectorStoreRetriever
import numpy as np
# Initialize embeddings model
embeddings = OpenAIEmbeddings(
model="text-embedding-ada-002",
chunk_size=1000 # Process embeddings in batches
)
# Create vector store from documents
def create_vector_store(documents, store_type="chroma"):
"""Create and populate vector store"""
if store_type == "chroma":
# Persistent Chroma store
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embeddings,
persist_directory="./chroma_db"
)
elif store_type == "faiss":
# In-memory FAISS store (faster for smaller datasets)
vectorstore = FAISS.from_documents(
documents=documents,
embedding=embeddings
)
return vectorstore
# Create vector store
vectorstore = create_vector_store(split_docs, store_type="chroma")
# Basic similarity search
query = "What are the key features of Python?"
similar_docs = vectorstore.similarity_search(query, k=4)
print(f"Found {len(similar_docs)} similar documents:")
for i, doc in enumerate(similar_docs):
print(f"Doc {i+1}: {doc.page_content[:200]}...")
print(f"Metadata: {doc.metadata}")
print("---")
# Similarity search with scores
docs_with_scores = vectorstore.similarity_search_with_score(query, k=3)
print("Documents with similarity scores:")
for doc, score in docs_with_scores:
print(f"Score: {score:.4f}")
print(f"Content: {doc.page_content[:150]}...")
print("---")
# Maximum Marginal Relevance (MMR) search
# Balances relevance with diversity
mmr_docs = vectorstore.max_marginal_relevance_search(
query=query,
k=4,
fetch_k=10, # Fetch more candidates
lambda_mult=0.7 # Balance relevance vs diversity
)
print(f"MMR search found {len(mmr_docs)} diverse documents")
# Custom similarity threshold
def search_with_threshold(vectorstore, query, threshold=0.8, max_results=10):
"""Search with minimum similarity threshold"""
docs_with_scores = vectorstore.similarity_search_with_score(
query, k=max_results
)
# Filter by threshold
filtered_docs = [
(doc, score) for doc, score in docs_with_scores
if score <= threshold # Lower scores = higher similarity in some stores
]
return filtered_docs
threshold_results = search_with_threshold(vectorstore, query, threshold=0.5)
print(f"Results above threshold: {len(threshold_results)}")π― Retrievers β
python
from langchain_core.retrievers import BaseRetriever
from langchain.retrievers import (
MultiQueryRetriever,
EnsembleRetriever,
ContextualCompressionRetriever
)
from langchain.retrievers.document_compressors import LLMChainExtractor
# Basic vector store retriever
basic_retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# Test basic retrieval
retrieved_docs = basic_retriever.get_relevant_documents(query)
print(f"Basic retrieval: {len(retrieved_docs)} documents")
# Multi-query retriever (generates multiple query variations)
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=basic_retriever,
llm=ChatOpenAI(temperature=0)
)
multi_query_docs = multi_query_retriever.get_relevant_documents(
"How to handle errors in Python?"
)
print(f"Multi-query retrieval: {len(multi_query_docs)} documents")
# Ensemble retriever (combines multiple retrieval methods)
# Create a keyword-based retriever
from langchain.retrievers import BM25Retriever
# Extract text content for BM25
doc_texts = [doc.page_content for doc in split_docs]
bm25_retriever = BM25Retriever.from_texts(doc_texts)
bm25_retriever.k = 3
# Combine vector and keyword search
ensemble_retriever = EnsembleRetriever(
retrievers=[basic_retriever, bm25_retriever],
weights=[0.7, 0.3] # Favor vector search
)
ensemble_docs = ensemble_retriever.get_relevant_documents(query)
print(f"Ensemble retrieval: {len(ensemble_docs)} documents")
# Contextual compression retriever
compressor = LLMChainExtractor.from_llm(ChatOpenAI(temperature=0))
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=basic_retriever
)
compressed_docs = compression_retriever.get_relevant_documents(
"Explain Python data structures"
)
print("Compressed retrieval results:")
for doc in compressed_docs:
print(f"Compressed: {doc.page_content}")
print("---")
# Custom hybrid retriever
class HybridRetriever(BaseRetriever):
"""Custom retriever combining multiple strategies"""
def __init__(self, vector_retriever, keyword_retriever, rerank_model=None):
self.vector_retriever = vector_retriever
self.keyword_retriever = keyword_retriever
self.rerank_model = rerank_model
def get_relevant_documents(self, query: str) -> list:
"""Retrieve and rerank documents"""
# Get results from both retrievers
vector_docs = self.vector_retriever.get_relevant_documents(query)
keyword_docs = self.keyword_retriever.get_relevant_documents(query)
# Combine and deduplicate
all_docs = vector_docs + keyword_docs
unique_docs = self._deduplicate_documents(all_docs)
# Optional reranking
if self.rerank_model:
return self._rerank_documents(unique_docs, query)
return unique_docs[:10] # Return top 10
def _deduplicate_documents(self, docs):
"""Remove duplicate documents based on content similarity"""
seen_content = set()
unique_docs = []
for doc in docs:
# Simple deduplication based on first 100 characters
content_hash = hash(doc.page_content[:100])
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_docs.append(doc)
return unique_docs
def _rerank_documents(self, docs, query):
"""Rerank documents based on relevance"""
# Placeholder for reranking logic
# In practice, you might use a cross-encoder model
return docs
hybrid_retriever = HybridRetriever(basic_retriever, bm25_retriever)
hybrid_docs = hybrid_retriever.get_relevant_documents(query)
print(f"Hybrid retrieval: {len(hybrid_docs)} documents")π RAG Implementation Patterns β
π― Basic RAG Chain β
python
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
def format_docs(docs):
"""Format retrieved documents for context"""
return "\n\n".join([f"Source {i+1}:\n{doc.page_content}" for i, doc in enumerate(docs)])
# Basic RAG chain
rag_chain = (
{"context": basic_retriever | format_docs, "question": RunnablePassthrough()}
| ChatPromptTemplate.from_template("""
Answer the question based on the following context:
Context:
{context}
Question: {question}
Instructions:
- Use only information from the provided context
- If the answer isn't in the context, say "I don't have enough information"
- Cite specific sources when possible
- Be concise but comprehensive
Answer:""")
| ChatOpenAI()
| StrOutputParser()
)
# Test basic RAG
rag_answer = rag_chain.invoke("What are the best practices for Python coding?")
print(f"RAG Answer: {rag_answer}")
# RAG with source attribution
def format_docs_with_sources(docs):
"""Format docs with source tracking"""
formatted_docs = []
sources = []
for i, doc in enumerate(docs):
source_id = f"[{i+1}]"
sources.append({
"id": source_id,
"metadata": doc.metadata,
"content_preview": doc.page_content[:100] + "..."
})
formatted_docs.append(f"{source_id} {doc.page_content}")
return {
"context": "\n\n".join(formatted_docs),
"sources": sources
}
# Enhanced RAG with sources
enhanced_rag_chain = (
{"docs": basic_retriever}
| RunnablePassthrough.assign(
context_data=lambda x: format_docs_with_sources(x["docs"])
)
| RunnablePassthrough.assign(
response=ChatPromptTemplate.from_template("""
Based on the following context, answer the question. Include source references.
Context:
{context_data[context]}
Question: {question}
Answer with source citations:""")
| ChatOpenAI()
| StrOutputParser()
)
)
enhanced_result = enhanced_rag_chain.invoke({
"question": "How do I handle exceptions in Python?"
})
print(f"Enhanced answer: {enhanced_result['response']}")
print(f"Sources used: {enhanced_result['context_data']['sources']}")π Advanced RAG Patterns β
python
# Multi-step RAG with query analysis
def create_multi_step_rag():
"""RAG with query analysis and refinement"""
# Step 1: Analyze and expand query
query_analyzer = (
ChatPromptTemplate.from_template("""
Analyze this question and provide:
1. Key concepts to search for
2. Alternative phrasings
3. Related topics to consider
Question: {question}
Analysis:""")
| ChatOpenAI()
| StrOutputParser()
)
# Step 2: Enhanced retrieval
def enhanced_retrieval(inputs):
original_query = inputs["question"]
analysis = inputs["analysis"]
# Get docs for original query
original_docs = basic_retriever.get_relevant_documents(original_query)
# Extract key concepts for additional searches
# In practice, you'd parse the analysis more carefully
additional_docs = []
if "concepts:" in analysis.lower():
# Simplified concept extraction
concepts = ["Python", "programming", "best practices"] # Would parse from analysis
for concept in concepts:
concept_docs = basic_retriever.get_relevant_documents(concept)
additional_docs.extend(concept_docs[:2])
# Combine and deduplicate
all_docs = original_docs + additional_docs
unique_docs = {doc.page_content: doc for doc in all_docs}
return list(unique_docs.values())[:8]
# Step 3: Answer generation
answer_generator = (
ChatPromptTemplate.from_template("""
Question Analysis: {analysis}
Based on the retrieved context, provide a comprehensive answer:
Context:
{context}
Original Question: {question}
Comprehensive Answer:""")
| ChatOpenAI()
| StrOutputParser()
)
# Combine steps
multi_step_chain = (
RunnablePassthrough.assign(analysis=query_analyzer)
| RunnablePassthrough.assign(
retrieved_docs=enhanced_retrieval
)
| RunnablePassthrough.assign(
context=lambda x: format_docs(x["retrieved_docs"])
)
| answer_generator
)
return multi_step_chain
multi_step_rag = create_multi_step_rag()
complex_answer = multi_step_rag.invoke({
"question": "What are the most important Python concepts for beginners?"
})
print(f"Multi-step RAG answer: {complex_answer}")
# Conversational RAG with memory
from langchain.memory import ConversationBufferMemory
class ConversationalRAG:
"""RAG system with conversation memory"""
def __init__(self, retriever, llm):
self.retriever = retriever
self.llm = llm
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
def ask(self, question: str) -> str:
"""Ask question with conversation context"""
# Get conversation history
history = self.memory.chat_memory.messages
# Enhance query with context
if history:
context_enhanced_query = f"""
Previous conversation context: {self._format_history(history)}
Current question: {question}
Enhanced search query considering context:"""
enhanced_query = self.llm.invoke(context_enhanced_query).content
else:
enhanced_query = question
# Retrieve relevant documents
docs = self.retriever.get_relevant_documents(enhanced_query)
context = format_docs(docs)
# Generate answer with conversation awareness
prompt = f"""
Conversation History:
{self._format_history(history)}
Retrieved Context:
{context}
Current Question: {question}
Answer considering both conversation context and retrieved information:"""
answer = self.llm.invoke(prompt).content
# Save to memory
self.memory.chat_memory.add_user_message(question)
self.memory.chat_memory.add_ai_message(answer)
return answer
def _format_history(self, messages):
"""Format conversation history"""
formatted = []
for msg in messages[-6:]: # Last 3 exchanges
role = "Human" if msg.type == "human" else "Assistant"
formatted.append(f"{role}: {msg.content}")
return "\n".join(formatted)
conversational_rag = ConversationalRAG(basic_retriever, ChatOpenAI())
# Test conversational flow
response1 = conversational_rag.ask("What is Python?")
print(f"Response 1: {response1}")
response2 = conversational_rag.ask("What are its main advantages?")
print(f"Response 2: {response2}")
response3 = conversational_rag.ask("How do I get started learning it?")
print(f"Response 3: {response3}")π― Retrieval Optimization Strategies β
β‘ Performance Optimization β
python
import time
from typing import List, Dict, Any
class OptimizedRetriever:
"""High-performance retriever with caching and batch processing"""
def __init__(self, vectorstore, cache_size=100):
self.vectorstore = vectorstore
self.cache = {}
self.cache_size = cache_size
self.stats = {"hits": 0, "misses": 0, "avg_time": 0}
def retrieve_with_cache(self, query: str, k: int = 5) -> List[Any]:
"""Retrieve with caching for performance"""
cache_key = f"{query}_{k}"
# Check cache
if cache_key in self.cache:
self.stats["hits"] += 1
return self.cache[cache_key]
# Cache miss - perform retrieval
start_time = time.time()
docs = self.vectorstore.similarity_search(query, k=k)
retrieval_time = time.time() - start_time
# Update stats
self.stats["misses"] += 1
total_requests = self.stats["hits"] + self.stats["misses"]
self.stats["avg_time"] = (
(self.stats["avg_time"] * (total_requests - 1) + retrieval_time)
/ total_requests
)
# Cache result
if len(self.cache) >= self.cache_size:
# Remove oldest entry
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[cache_key] = docs
return docs
def batch_retrieve(self, queries: List[str], k: int = 5) -> Dict[str, List[Any]]:
"""Batch retrieval for efficiency"""
results = {}
# Separate cached and uncached queries
cached_queries = []
uncached_queries = []
for query in queries:
cache_key = f"{query}_{k}"
if cache_key in self.cache:
results[query] = self.cache[cache_key]
cached_queries.append(query)
else:
uncached_queries.append(query)
# Process uncached queries in batch
if uncached_queries:
for query in uncached_queries:
docs = self.vectorstore.similarity_search(query, k=k)
results[query] = docs
# Cache result
cache_key = f"{query}_{k}"
self.cache[cache_key] = docs
return results
def get_stats(self) -> Dict[str, Any]:
"""Get performance statistics"""
total_requests = self.stats["hits"] + self.stats["misses"]
hit_rate = self.stats["hits"] / total_requests if total_requests > 0 else 0
return {
"cache_hit_rate": hit_rate,
"total_requests": total_requests,
"average_retrieval_time": self.stats["avg_time"],
"cache_size": len(self.cache)
}
# Test optimized retriever
optimized_retriever = OptimizedRetriever(vectorstore)
# Single queries
docs1 = optimized_retriever.retrieve_with_cache("Python basics")
docs2 = optimized_retriever.retrieve_with_cache("Python basics") # Should hit cache
# Batch queries
batch_queries = [
"machine learning algorithms",
"data structures in Python",
"web development frameworks",
"Python basics" # Already cached
]
batch_results = optimized_retriever.batch_retrieve(batch_queries)
print(f"Batch results: {len(batch_results)} queries processed")
# Performance stats
stats = optimized_retriever.get_stats()
print(f"Retriever stats: {stats}")π― Query Enhancement β
python
class QueryEnhancer:
"""Enhance queries for better retrieval results"""
def __init__(self, llm):
self.llm = llm
def expand_query(self, query: str) -> List[str]:
"""Generate multiple query variations"""
expansion_prompt = f"""
Generate 3-5 alternative phrasings for this search query that would help find relevant information:
Original query: {query}
Alternative queries (one per line):"""
response = self.llm.invoke(expansion_prompt).content
# Parse response into list
alternatives = [
line.strip()
for line in response.split('\n')
if line.strip() and not line.strip().startswith('Alternative')
]
return [query] + alternatives[:4] # Original + up to 4 alternatives
def extract_keywords(self, query: str) -> List[str]:
"""Extract key search terms"""
keyword_prompt = f"""
Extract the most important keywords and phrases from this query for document search:
Query: {query}
Keywords (comma-separated):"""
response = self.llm.invoke(keyword_prompt).content
keywords = [kw.strip() for kw in response.split(',')]
return keywords
def contextualize_query(self, query: str, conversation_history: List[str]) -> str:
"""Add conversation context to query"""
if not conversation_history:
return query
context_prompt = f"""
Given this conversation history, rewrite the current query to be more specific and contextual:
Conversation history:
{chr(10).join(conversation_history[-4:])} # Last 4 exchanges
Current query: {query}
Contextualized query:"""
response = self.llm.invoke(context_prompt).content
return response.strip()
# Enhanced retrieval pipeline
class EnhancedRetrievalPipeline:
"""Complete retrieval pipeline with query enhancement"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.query_enhancer = QueryEnhancer(llm)
self.optimized_retriever = OptimizedRetriever(vectorstore)
def retrieve(self, query: str, conversation_history: List[str] = None, k: int = 5) -> Dict[str, Any]:
"""Enhanced retrieval with multiple strategies"""
# Step 1: Contextualize query if history available
if conversation_history:
contextualized_query = self.query_enhancer.contextualize_query(
query, conversation_history
)
else:
contextualized_query = query
# Step 2: Expand query
query_variations = self.query_enhancer.expand_query(contextualized_query)
# Step 3: Retrieve for each variation
all_docs = []
for variation in query_variations:
docs = self.optimized_retriever.retrieve_with_cache(variation, k=3)
all_docs.extend(docs)
# Step 4: Deduplicate and rank
unique_docs = self._deduplicate_and_rank(all_docs, query, k)
# Step 5: Extract keywords for additional context
keywords = self.query_enhancer.extract_keywords(query)
return {
"documents": unique_docs,
"original_query": query,
"contextualized_query": contextualized_query,
"query_variations": query_variations,
"keywords": keywords,
"total_retrieved": len(all_docs),
"final_count": len(unique_docs)
}
def _deduplicate_and_rank(self, docs: List[Any], original_query: str, k: int) -> List[Any]:
"""Remove duplicates and rank by relevance"""
# Simple deduplication by content similarity
seen_content = set()
unique_docs = []
for doc in docs:
content_hash = hash(doc.page_content[:200]) # First 200 chars
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_docs.append(doc)
# In practice, you might use a reranking model here
# For now, just return top k
return unique_docs[:k]
# Test enhanced pipeline
enhanced_pipeline = EnhancedRetrievalPipeline(vectorstore, ChatOpenAI())
# Test with conversation context
conversation_history = [
"Human: What is Python?",
"Assistant: Python is a programming language...",
"Human: What about its libraries?"
]
enhanced_result = enhanced_pipeline.retrieve(
"How do I use them?",
conversation_history=conversation_history
)
print(f"Enhanced retrieval result: {enhanced_result}")
print(f"Found {enhanced_result['final_count']} unique documents")
print(f"Query variations: {enhanced_result['query_variations']}")π Document Management and Updates β
π Dynamic Document Updates β
python
class DynamicKnowledgeBase:
"""Knowledge base with dynamic updates and versioning"""
def __init__(self, vectorstore, embeddings):
self.vectorstore = vectorstore
self.embeddings = embeddings
self.document_registry = {} # Track document metadata
self.version_counter = 0
def add_documents(self, documents: List[Document], source: str = "unknown") -> List[str]:
"""Add new documents to the knowledge base"""
# Add metadata
for doc in documents:
doc.metadata.update({
"source": source,
"added_timestamp": time.time(),
"version": self.version_counter
})
# Add to vector store
doc_ids = self.vectorstore.add_documents(documents)
# Update registry
for doc_id, doc in zip(doc_ids, documents):
self.document_registry[doc_id] = {
"content_preview": doc.page_content[:100],
"metadata": doc.metadata,
"status": "active"
}
self.version_counter += 1
return doc_ids
def update_document(self, doc_id: str, new_content: str, metadata_updates: Dict = None):
"""Update existing document"""
if doc_id not in self.document_registry:
raise ValueError(f"Document {doc_id} not found")
# Mark old version as inactive
self.document_registry[doc_id]["status"] = "updated"
# Create new document
old_metadata = self.document_registry[doc_id]["metadata"].copy()
if metadata_updates:
old_metadata.update(metadata_updates)
old_metadata.update({
"updated_timestamp": time.time(),
"version": self.version_counter,
"replaces": doc_id
})
new_doc = Document(page_content=new_content, metadata=old_metadata)
new_doc_ids = self.add_documents([new_doc], source="update")
return new_doc_ids[0]
def delete_document(self, doc_id: str):
"""Soft delete document"""
if doc_id in self.document_registry:
self.document_registry[doc_id]["status"] = "deleted"
# Note: Actual removal from vector store depends on implementation
def search_with_filters(self, query: str, filters: Dict = None, k: int = 5) -> List[Document]:
"""Search with metadata filters"""
# Basic search
docs = self.vectorstore.similarity_search(query, k=k*2) # Get more to filter
# Apply filters
if filters:
filtered_docs = []
for doc in docs:
match = True
for key, value in filters.items():
if key not in doc.metadata or doc.metadata[key] != value:
match = False
break
if match:
filtered_docs.append(doc)
docs = filtered_docs[:k]
# Filter out inactive documents
active_docs = []
for doc in docs:
doc_id = self._get_doc_id(doc) # Implementation-specific
if doc_id and self.document_registry.get(doc_id, {}).get("status") == "active":
active_docs.append(doc)
return active_docs[:k]
def _get_doc_id(self, doc: Document) -> str:
"""Get document ID - implementation specific"""
# This would depend on your vector store implementation
return doc.metadata.get("doc_id", "")
def get_knowledge_base_stats(self) -> Dict[str, Any]:
"""Get knowledge base statistics"""
total_docs = len(self.document_registry)
active_docs = sum(1 for doc in self.document_registry.values() if doc["status"] == "active")
return {
"total_documents": total_docs,
"active_documents": active_docs,
"current_version": self.version_counter,
"sources": list(set(doc["metadata"]["source"] for doc in self.document_registry.values()))
}
# Example usage
dynamic_kb = DynamicKnowledgeBase(vectorstore, embeddings)
# Add new documents
new_docs = [
Document(page_content="Python 3.12 introduces new features...", metadata={"topic": "python"}),
Document(page_content="Machine learning best practices include...", metadata={"topic": "ml"})
]
doc_ids = dynamic_kb.add_documents(new_docs, source="manual_entry")
print(f"Added documents: {doc_ids}")
# Search with filters
filtered_results = dynamic_kb.search_with_filters(
"Python features",
filters={"topic": "python"},
k=3
)
print(f"Filtered search: {len(filtered_results)} results")
# Get stats
stats = dynamic_kb.get_knowledge_base_stats()
print(f"Knowledge base stats: {stats}")π Integration Patterns β
π Complete RAG System β
python
class ProductionRAGSystem:
"""Production-ready RAG system with monitoring and error handling"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.setup_components()
self.setup_monitoring()
def setup_components(self):
"""Initialize all RAG components"""
# Document processing
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.config.get("chunk_size", 1000),
chunk_overlap=self.config.get("chunk_overlap", 200)
)
# Embeddings and vector store
self.embeddings = OpenAIEmbeddings()
self.vectorstore = Chroma(
persist_directory=self.config.get("vector_db_path", "./chroma_db"),
embedding_function=self.embeddings
)
# Enhanced retrieval pipeline
self.retrieval_pipeline = EnhancedRetrievalPipeline(
self.vectorstore,
ChatOpenAI()
)
# Response generation
self.response_chain = self._create_response_chain()
def _create_response_chain(self):
"""Create the response generation chain"""
return (
ChatPromptTemplate.from_template("""
You are a helpful AI assistant. Answer the question based on the provided context.
Context:
{context}
Question: {question}
Instructions:
- Provide accurate, helpful answers based on the context
- If information is not in the context, say so clearly
- Include relevant source references
- Be concise but comprehensive
Answer:""")
| ChatOpenAI(temperature=self.config.get("temperature", 0.1))
| StrOutputParser()
)
def setup_monitoring(self):
"""Setup monitoring and logging"""
self.query_log = []
self.performance_metrics = {
"total_queries": 0,
"average_response_time": 0,
"retrieval_success_rate": 0
}
def add_documents(self, file_paths: List[str]) -> Dict[str, Any]:
"""Add documents from files"""
results = {"success": [], "failed": []}
for file_path in file_paths:
try:
# Load document
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.txt'):
loader = TextLoader(file_path)
else:
results["failed"].append(f"{file_path}: Unsupported format")
continue
docs = loader.load()
# Split documents
split_docs = self.text_splitter.split_documents(docs)
# Add to vector store
self.vectorstore.add_documents(split_docs)
results["success"].append(f"{file_path}: {len(split_docs)} chunks added")
except Exception as e:
results["failed"].append(f"{file_path}: {str(e)}")
return results
def query(self, question: str, conversation_history: List[str] = None) -> Dict[str, Any]:
"""Process a query through the RAG system"""
start_time = time.time()
try:
# Enhanced retrieval
retrieval_result = self.retrieval_pipeline.retrieve(
question,
conversation_history or []
)
# Format context
context = format_docs(retrieval_result["documents"])
# Generate response
response = self.response_chain.invoke({
"context": context,
"question": question
})
# Calculate metrics
response_time = time.time() - start_time
# Log query
query_log_entry = {
"question": question,
"response_time": response_time,
"num_docs_retrieved": len(retrieval_result["documents"]),
"success": True,
"timestamp": time.time()
}
self.query_log.append(query_log_entry)
# Update metrics
self._update_metrics(query_log_entry)
return {
"answer": response,
"sources": retrieval_result["documents"],
"query_variations": retrieval_result["query_variations"],
"response_time": response_time,
"success": True
}
except Exception as e:
error_time = time.time() - start_time
# Log error
error_log_entry = {
"question": question,
"response_time": error_time,
"success": False,
"error": str(e),
"timestamp": time.time()
}
self.query_log.append(error_log_entry)
return {
"answer": "I apologize, but I encountered an error processing your question.",
"error": str(e),
"response_time": error_time,
"success": False
}
def _update_metrics(self, query_log_entry: Dict[str, Any]):
"""Update performance metrics"""
self.performance_metrics["total_queries"] += 1
# Update average response time
total_queries = self.performance_metrics["total_queries"]
current_avg = self.performance_metrics["average_response_time"]
new_time = query_log_entry["response_time"]
self.performance_metrics["average_response_time"] = (
(current_avg * (total_queries - 1) + new_time) / total_queries
)
# Update success rate
successful_queries = sum(1 for log in self.query_log if log["success"])
self.performance_metrics["retrieval_success_rate"] = (
successful_queries / total_queries
)
def get_metrics(self) -> Dict[str, Any]:
"""Get system performance metrics"""
return {
"performance": self.performance_metrics,
"recent_queries": len(self.query_log),
"vector_store_stats": self._get_vector_store_stats()
}
def _get_vector_store_stats(self) -> Dict[str, Any]:
"""Get vector store statistics"""
# This would depend on your vector store implementation
return {"status": "active"}
# Example configuration and usage
rag_config = {
"chunk_size": 1000,
"chunk_overlap": 200,
"vector_db_path": "./production_chroma_db",
"temperature": 0.1
}
# Initialize production RAG system
rag_system = ProductionRAGSystem(rag_config)
# Add documents
file_results = rag_system.add_documents([
"data/python_guide.pdf",
"data/best_practices.txt"
])
print(f"Document loading results: {file_results}")
# Query the system
result = rag_system.query(
"What are the best practices for error handling in Python?",
conversation_history=["Previous question about Python basics"]
)
print(f"RAG Response: {result['answer']}")
print(f"Response time: {result['response_time']:.2f}s")
# Get system metrics
metrics = rag_system.get_metrics()
print(f"System metrics: {metrics}")π Next Steps β
Ready to build more sophisticated AI systems? Continue with:
- Agents Basics - Build autonomous AI agents that use tools
- Tools - Integrate external APIs and services
- Memory Basics - Add conversation memory and context
Key Retrieval Takeaways:
- RAG is essential for accurate, current, and verifiable AI responses
- Document processing quality directly impacts retrieval effectiveness
- Vector search excels at semantic similarity, keywords for exact matches
- Query enhancement significantly improves retrieval quality
- Hybrid approaches combining multiple strategies work best
- Monitoring and optimization crucial for production systems
- Dynamic updates enable evolving knowledge bases