Skip to content

RAG (Retrieval-Augmented Generation) ​

Enhance AI models by connecting them to external knowledge sources for more accurate, up-to-date responses

πŸ” What is RAG? ​

Definition: A technique that expands a model's knowledge by connecting it to external databases or documents during response generation

Simple Analogy: Like giving a smart assistant access to a library. Instead of relying only on memory, it can look up current information from books and documents when answering questions.

Key Insight: RAG combines the reasoning capabilities of large language models with the ability to access fresh, specific information from external sources.

How RAG Works ​

The RAG Process ​

  1. User asks a question: "What's the latest news about renewable energy?"
  2. System searches database: Finds relevant documents/articles
  3. Retrieves context: Gets the most relevant information
  4. Augments prompt: Combines question + retrieved context
  5. Model generates answer: Uses both training + fresh context
python
# Simplified RAG workflow
def rag_response(user_question):
    # Step 1: Retrieve relevant documents
    relevant_docs = vector_search(user_question, document_database)
    
    # Step 2: Create context from retrieved documents
    context = "\n".join([doc.content for doc in relevant_docs[:3]])
    
    # Step 3: Augment the prompt
    augmented_prompt = f"""
    Context: {context}
    
    Question: {user_question}
    
    Answer the question based on the provided context. If the context doesn't contain enough information, say so.
    """
    
    # Step 4: Generate response
    response = llm.generate(augmented_prompt)
    return response

Key Characteristics ​

  • ❌ No Weight Change: Model parameters stay the same
  • βœ… External Knowledge: Connects to external knowledge sources
  • 🟑 Difficulty: MEDIUM - Requires setup and maintenance
  • πŸ“Š Outcome: Model accesses updated information without retraining

RAG Architecture Components ​

1. Document Store ​

The repository where your knowledge is stored:

python
class DocumentStore:
    def __init__(self):
        self.documents = []
    
    def add_document(self, doc_id, content, metadata=None):
        """Add a document to the store"""
        document = {
            'id': doc_id,
            'content': content,
            'metadata': metadata or {},
            'timestamp': datetime.now()
        }
        self.documents.append(document)
    
    def update_document(self, doc_id, new_content):
        """Update existing document"""
        for doc in self.documents:
            if doc['id'] == doc_id:
                doc['content'] = new_content
                doc['timestamp'] = datetime.now()
                return True
        return False
    
    def get_all_documents(self):
        """Retrieve all documents"""
        return self.documents

# Example usage
doc_store = DocumentStore()
doc_store.add_document(
    doc_id="renewable_energy_2024",
    content="Solar panel efficiency has increased to 25% in 2024...",
    metadata={"category": "energy", "year": 2024}
)

2. Vector Database ​

Converts documents into searchable embeddings:

python
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer

class VectorDatabase:
    def __init__(self, model_name='all-MiniLM-L6-v2'):
        self.model = SentenceTransformer(model_name)
        self.embeddings = []
        self.documents = []
        self.doc_ids = []
    
    def add_documents(self, documents):
        """Add documents and create embeddings"""
        for doc in documents:
            # Create embedding for document content
            embedding = self.model.encode(doc['content'])
            
            self.embeddings.append(embedding)
            self.documents.append(doc)
            self.doc_ids.append(doc['id'])
    
    def search(self, query, k=5):
        """Search for similar documents"""
        if not self.embeddings:
            return []
        
        # Create query embedding
        query_embedding = self.model.encode(query)
        
        # Calculate similarities
        similarities = cosine_similarity(
            [query_embedding], 
            self.embeddings
        )[0]
        
        # Get top-k most similar documents
        top_indices = similarities.argsort()[-k:][::-1]
        
        results = []
        for idx in top_indices:
            results.append({
                'document': self.documents[idx],
                'similarity': similarities[idx],
                'doc_id': self.doc_ids[idx]
            })
        
        return results

# Example usage
vector_db = VectorDatabase()

# Add documents from document store
documents = doc_store.get_all_documents()
vector_db.add_documents(documents)

# Search for relevant documents
query = "latest solar panel technology"
results = vector_db.search(query, k=3)

for result in results:
    print(f"Similarity: {result['similarity']:.3f}")
    print(f"Content: {result['document']['content'][:100]}...")
    print("---")

3. Retrieval System ​

Intelligent document retrieval with ranking and filtering:

python
class AdvancedRetriever:
    def __init__(self, vector_db, doc_store):
        self.vector_db = vector_db
        self.doc_store = doc_store
    
    def retrieve(self, query, k=5, min_similarity=0.3, filters=None):
        """Advanced retrieval with filtering and ranking"""
        # Get initial results from vector search
        candidates = self.vector_db.search(query, k=k*2)  # Get more candidates
        
        # Apply similarity threshold
        candidates = [c for c in candidates if c['similarity'] >= min_similarity]
        
        # Apply metadata filters if provided
        if filters:
            candidates = self._apply_filters(candidates, filters)
        
        # Re-rank results
        ranked_results = self._rerank_results(query, candidates)
        
        # Return top-k results
        return ranked_results[:k]
    
    def _apply_filters(self, candidates, filters):
        """Apply metadata filters"""
        filtered = []
        for candidate in candidates:
            metadata = candidate['document'].get('metadata', {})
            
            # Check if document matches all filters
            matches = True
            for key, value in filters.items():
                if metadata.get(key) != value:
                    matches = False
                    break
            
            if matches:
                filtered.append(candidate)
        
        return filtered
    
    def _rerank_results(self, query, candidates):
        """Re-rank results based on additional criteria"""
        # Simple re-ranking based on recency and similarity
        for candidate in candidates:
            doc = candidate['document']
            
            # Calculate recency score (newer is better)
            days_old = (datetime.now() - doc.get('timestamp', datetime.now())).days
            recency_score = max(0, 1 - days_old / 365)  # Decay over a year
            
            # Combine similarity and recency
            combined_score = (
                candidate['similarity'] * 0.7 + 
                recency_score * 0.3
            )
            candidate['combined_score'] = combined_score
        
        # Sort by combined score
        return sorted(candidates, key=lambda x: x['combined_score'], reverse=True)

# Example usage
retriever = AdvancedRetriever(vector_db, doc_store)

# Retrieve with filters
results = retriever.retrieve(
    query="solar panel efficiency",
    k=3,
    min_similarity=0.2,
    filters={"category": "energy"}
)

for result in results:
    print(f"Combined Score: {result['combined_score']:.3f}")
    print(f"Content: {result['document']['content'][:100]}...")
    print("---")

4. Response Generator ​

Combines retrieved context with the language model:

python
class RAGGenerator:
    def __init__(self, llm, retriever, max_context_length=2000):
        self.llm = llm
        self.retriever = retriever
        self.max_context_length = max_context_length
    
    def generate_response(self, question, include_sources=True):
        """Generate response using RAG"""
        # Retrieve relevant documents
        retrieved_docs = self.retriever.retrieve(question, k=3)
        
        if not retrieved_docs:
            return self._handle_no_context(question)
        
        # Prepare context
        context = self._prepare_context(retrieved_docs)
        
        # Create augmented prompt
        prompt = self._create_prompt(question, context)
        
        # Generate response
        response = self.llm.generate(prompt)
        
        # Optionally include sources
        if include_sources:
            sources = self._format_sources(retrieved_docs)
            response += f"\n\nSources:\n{sources}"
        
        return response
    
    def _prepare_context(self, retrieved_docs):
        """Prepare context from retrieved documents"""
        context_parts = []
        current_length = 0
        
        for doc_info in retrieved_docs:
            doc_content = doc_info['document']['content']
            
            # Check if adding this document exceeds context limit
            if current_length + len(doc_content) > self.max_context_length:
                # Truncate the content to fit
                remaining_space = self.max_context_length - current_length
                if remaining_space > 100:  # Only add if meaningful space left
                    doc_content = doc_content[:remaining_space] + "..."
                    context_parts.append(doc_content)
                break
            
            context_parts.append(doc_content)
            current_length += len(doc_content)
        
        return "\n\n".join(context_parts)
    
    def _create_prompt(self, question, context):
        """Create the augmented prompt"""
        return f"""Based on the following context, please answer the question. If the context doesn't contain enough information to answer the question, please say so.

Context:
{context}

Question: {question}

Answer:"""
    
    def _handle_no_context(self, question):
        """Handle cases where no relevant context is found"""
        prompt = f"""I don't have specific information about "{question}" in my knowledge base. I can provide general information based on my training, but please note that this may not be the most current information.

Question: {question}

General answer:"""
        return self.llm.generate(prompt)
    
    def _format_sources(self, retrieved_docs):
        """Format source information"""
        sources = []
        for i, doc_info in enumerate(retrieved_docs, 1):
            doc = doc_info['document']
            doc_id = doc.get('id', f'Document {i}')
            similarity = doc_info.get('similarity', 0)
            sources.append(f"{i}. {doc_id} (relevance: {similarity:.2f})")
        
        return "\n".join(sources)

# Example usage
class MockLLM:
    def generate(self, prompt):
        # This would be replaced with actual LLM call
        return f"Generated response based on prompt: {prompt[:50]}..."

llm = MockLLM()
rag_generator = RAGGenerator(llm, retriever)

response = rag_generator.generate_response(
    "What are the latest improvements in solar panel technology?"
)
print(response)

RAG Implementation Patterns ​

1. Simple RAG ​

Basic implementation for getting started:

python
def simple_rag(question, documents):
    """Simplified RAG implementation"""
    # 1. Simple keyword-based retrieval
    relevant_docs = []
    question_words = set(question.lower().split())
    
    for doc in documents:
        doc_words = set(doc['content'].lower().split())
        overlap = len(question_words.intersection(doc_words))
        if overlap > 0:
            relevant_docs.append((doc, overlap))
    
    # 2. Sort by overlap and take top 3
    relevant_docs.sort(key=lambda x: x[1], reverse=True)
    top_docs = relevant_docs[:3]
    
    # 3. Create context
    context = "\n".join([doc[0]['content'] for doc in top_docs])
    
    # 4. Create prompt
    prompt = f"Context: {context}\n\nQuestion: {question}\n\nAnswer:"
    
    return prompt

# Example
documents = [
    {"content": "Solar panels convert sunlight into electricity using photovoltaic cells."},
    {"content": "Wind turbines generate electricity from wind energy."},
    {"content": "Modern solar panels have efficiency rates of 20-25%."}
]

prompt = simple_rag("How efficient are solar panels?", documents)
print(prompt)

2. Hierarchical RAG ​

Multi-level retrieval for complex documents:

python
class HierarchicalRAG:
    def __init__(self):
        self.document_summaries = {}
        self.document_chunks = {}
    
    def index_document(self, doc_id, full_content, chunk_size=500):
        """Index document with summary and chunks"""
        # Create document summary
        summary = self._create_summary(full_content)
        self.document_summaries[doc_id] = summary
        
        # Create chunks
        chunks = self._create_chunks(full_content, chunk_size)
        self.document_chunks[doc_id] = chunks
    
    def retrieve(self, query, k=5):
        """Two-stage retrieval: summary then chunks"""
        # Stage 1: Find relevant documents using summaries
        relevant_docs = self._find_relevant_docs(query)
        
        # Stage 2: Find relevant chunks within those documents
        relevant_chunks = []
        for doc_id in relevant_docs[:3]:  # Top 3 documents
            chunks = self._find_relevant_chunks(query, doc_id)
            relevant_chunks.extend(chunks)
        
        # Return top-k chunks
        return sorted(relevant_chunks, key=lambda x: x['score'], reverse=True)[:k]
    
    def _create_summary(self, content):
        """Create document summary (simplified)"""
        sentences = content.split('. ')
        return '. '.join(sentences[:3]) + '.'  # First 3 sentences
    
    def _create_chunks(self, content, chunk_size):
        """Split document into chunks"""
        words = content.split()
        chunks = []
        
        for i in range(0, len(words), chunk_size//4):  # Overlap chunks
            chunk_words = words[i:i + chunk_size]
            chunk_text = ' '.join(chunk_words)
            chunks.append({
                'text': chunk_text,
                'start_index': i,
                'word_count': len(chunk_words)
            })
        
        return chunks
    
    def _find_relevant_docs(self, query):
        """Find documents with relevant summaries"""
        query_words = set(query.lower().split())
        scored_docs = []
        
        for doc_id, summary in self.document_summaries.items():
            summary_words = set(summary.lower().split())
            overlap = len(query_words.intersection(summary_words))
            if overlap > 0:
                scored_docs.append((doc_id, overlap))
        
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        return [doc_id for doc_id, _ in scored_docs]
    
    def _find_relevant_chunks(self, query, doc_id):
        """Find relevant chunks within a document"""
        query_words = set(query.lower().split())
        chunks = self.document_chunks.get(doc_id, [])
        scored_chunks = []
        
        for chunk in chunks:
            chunk_words = set(chunk['text'].lower().split())
            overlap = len(query_words.intersection(chunk_words))
            if overlap > 0:
                scored_chunks.append({
                    'doc_id': doc_id,
                    'text': chunk['text'],
                    'score': overlap
                })
        
        return scored_chunks

# Example usage
h_rag = HierarchicalRAG()

# Index a long document
long_document = """
Solar energy technology has advanced significantly in recent years. Modern photovoltaic cells can convert sunlight into electricity with efficiency rates exceeding 25%. The cost of solar panels has dropped dramatically, making solar energy competitive with traditional fossil fuels. Installation processes have also improved, with many residential systems being installed in just one day. Solar energy storage solutions, including lithium-ion batteries, have become more affordable and efficient. This allows homeowners to store excess energy generated during the day for use at night. Government incentives and rebate programs continue to support solar adoption. Many countries have set ambitious renewable energy targets for the coming decades.
"""

h_rag.index_document("solar_tech_2024", long_document)

# Retrieve relevant information
results = h_rag.retrieve("solar panel efficiency rates")
for result in results:
    print(f"Score: {result['score']}")
    print(f"Text: {result['text'][:100]}...")
    print("---")

Real-World RAG Applications ​

1. Customer Support Chatbot ​

python
class CustomerSupportRAG:
    def __init__(self):
        self.knowledge_base = []
        self.conversation_history = []
    
    def add_knowledge(self, category, question, answer):
        """Add FAQ or knowledge item"""
        self.knowledge_base.append({
            'category': category,
            'question': question,
            'answer': answer,
            'keywords': self._extract_keywords(question + " " + answer)
        })
    
    def answer_customer_question(self, customer_question, customer_id=None):
        """Answer customer question using knowledge base"""
        # Find relevant knowledge items
        relevant_items = self._find_relevant_knowledge(customer_question)
        
        if not relevant_items:
            return self._escalate_to_human(customer_question, customer_id)
        
        # Create response using most relevant items
        context = self._build_context(relevant_items[:3])
        
        response = f"""Based on our knowledge base:

{context}

For your specific question: "{customer_question}"

{self._generate_specific_answer(customer_question, relevant_items[0])}

Is there anything else I can help you with?"""
        
        # Log interaction
        self._log_interaction(customer_question, response, customer_id)
        
        return response
    
    def _find_relevant_knowledge(self, question):
        """Find relevant knowledge base items"""
        question_keywords = set(question.lower().split())
        scored_items = []
        
        for item in self.knowledge_base:
            item_keywords = set(item['keywords'])
            overlap = len(question_keywords.intersection(item_keywords))
            if overlap > 0:
                scored_items.append((item, overlap))
        
        scored_items.sort(key=lambda x: x[1], reverse=True)
        return [item for item, _ in scored_items]
    
    def _build_context(self, items):
        """Build context from knowledge items"""
        context_parts = []
        for item in items:
            context_parts.append(f"Q: {item['question']}\nA: {item['answer']}\n")
        return "\n".join(context_parts)
    
    def _generate_specific_answer(self, question, best_match):
        """Generate specific answer based on best match"""
        # In a real implementation, this would use an LLM
        return f"Based on our documentation: {best_match['answer']}"
    
    def _extract_keywords(self, text):
        """Extract keywords from text"""
        # Simple keyword extraction
        words = text.lower().split()
        # Filter out common words
        stop_words = {'the', 'is', 'at', 'which', 'on', 'a', 'an', 'and', 'or', 'but'}
        keywords = [word for word in words if word not in stop_words and len(word) > 2]
        return keywords
    
    def _escalate_to_human(self, question, customer_id):
        """Escalate to human agent"""
        return f"I don't have specific information about your question. Let me connect you with a human agent who can help you better. Reference ID: {customer_id or 'GUEST'}"
    
    def _log_interaction(self, question, response, customer_id):
        """Log customer interaction"""
        self.conversation_history.append({
            'customer_id': customer_id,
            'question': question,
            'response': response,
            'timestamp': datetime.now()
        })

# Example usage
support_rag = CustomerSupportRAG()

# Add knowledge base items
support_rag.add_knowledge(
    "billing", 
    "How do I change my billing address?",
    "You can update your billing address by logging into your account and going to Settings > Billing Information."
)

support_rag.add_knowledge(
    "technical",
    "Why is my internet slow?",
    "Slow internet can be caused by: 1) High network traffic, 2) Outdated equipment, 3) Background downloads. Try restarting your modem first."
)

# Answer customer questions
customer_question = "My internet connection is very slow today"
response = support_rag.answer_customer_question(customer_question, "CUST_12345")
print(response)

2. Medical Research Assistant ​

python
class MedicalRAG:
    def __init__(self):
        self.research_papers = []
        self.clinical_guidelines = []
        self.drug_database = []
    
    def add_research_paper(self, title, abstract, authors, journal, year):
        """Add research paper to database"""
        self.research_papers.append({
            'type': 'research_paper',
            'title': title,
            'abstract': abstract,
            'authors': authors,
            'journal': journal,
            'year': year,
            'content': f"{title}. {abstract}"
        })
    
    def answer_medical_query(self, query, evidence_level="high"):
        """Answer medical query with appropriate evidence"""
        # Find relevant research
        relevant_papers = self._find_relevant_research(query)
        
        # Filter by evidence level if needed
        if evidence_level == "high":
            relevant_papers = [p for p in relevant_papers if p.get('year', 0) >= 2020]
        
        # Create evidence-based response
        response = self._create_medical_response(query, relevant_papers[:5])
        
        return response
    
    def _find_relevant_research(self, query):
        """Find relevant medical research"""
        query_terms = query.lower().split()
        scored_papers = []
        
        for paper in self.research_papers:
            content = paper['content'].lower()
            relevance_score = sum(1 for term in query_terms if term in content)
            
            if relevance_score > 0:
                scored_papers.append((paper, relevance_score))
        
        scored_papers.sort(key=lambda x: x[1], reverse=True)
        return [paper for paper, _ in scored_papers]
    
    def _create_medical_response(self, query, papers):
        """Create evidence-based medical response"""
        if not papers:
            return "I don't have sufficient evidence to answer this medical query."
        
        response = f"Based on recent medical literature:\n\n"
        
        for i, paper in enumerate(papers[:3], 1):
            response += f"{i}. {paper['title']} ({paper['year']})\n"
            response += f"   {paper['abstract'][:200]}...\n\n"
        
        response += f"DISCLAIMER: This information is for educational purposes only and should not replace professional medical advice."
        
        return response

# Example usage
medical_rag = MedicalRAG()

medical_rag.add_research_paper(
    title="Efficacy of vitamin D supplementation in COVID-19 prevention",
    abstract="This randomized controlled trial investigated the effects of vitamin D supplementation on COVID-19 incidence...",
    authors=["Dr. Smith", "Dr. Johnson"],
    journal="New England Journal of Medicine",
    year=2023
)

query = "vitamin D COVID prevention"
response = medical_rag.answer_medical_query(query)
print(response)

🎯 Best Practices for RAG ​

1. Document Preparation ​

python
def prepare_documents_for_rag(documents):
    """Best practices for document preparation"""
    prepared_docs = []
    
    for doc in documents:
        # Clean and normalize text
        clean_content = clean_text(doc['content'])
        
        # Add metadata
        metadata = {
            'source': doc.get('source', 'unknown'),
            'last_updated': doc.get('timestamp', datetime.now()),
            'content_type': detect_content_type(clean_content),
            'word_count': len(clean_content.split()),
            'language': detect_language(clean_content)
        }
        
        # Create chunks if document is long
        if metadata['word_count'] > 1000:
            chunks = create_smart_chunks(clean_content)
            for i, chunk in enumerate(chunks):
                prepared_docs.append({
                    'id': f"{doc['id']}_chunk_{i}",
                    'content': chunk,
                    'metadata': {**metadata, 'chunk_index': i, 'total_chunks': len(chunks)}
                })
        else:
            prepared_docs.append({
                'id': doc['id'],
                'content': clean_content,
                'metadata': metadata
            })
    
    return prepared_docs

def clean_text(text):
    """Clean and normalize text"""
    import re
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove special characters that might interfere
    text = re.sub(r'[^\w\s\.\,\!\?\;\:]', '', text)
    
    # Normalize case for better matching
    return text.strip()

def create_smart_chunks(text, chunk_size=500, overlap=50):
    """Create semantically meaningful chunks"""
    sentences = text.split('. ')
    chunks = []
    current_chunk = []
    current_length = 0
    
    for sentence in sentences:
        sentence_length = len(sentence.split())
        
        if current_length + sentence_length > chunk_size and current_chunk:
            # Create chunk
            chunks.append('. '.join(current_chunk) + '.')
            
            # Start new chunk with overlap
            overlap_sentences = current_chunk[-overlap//20:] if len(current_chunk) > overlap//20 else current_chunk
            current_chunk = overlap_sentences + [sentence]
            current_length = sum(len(s.split()) for s in current_chunk)
        else:
            current_chunk.append(sentence)
            current_length += sentence_length
    
    # Add final chunk
    if current_chunk:
        chunks.append('. '.join(current_chunk) + '.')
    
    return chunks

2. Retrieval Optimization ​

python
class OptimizedRetriever:
    def __init__(self, vector_db):
        self.vector_db = vector_db
        self.query_cache = {}
    
    def retrieve_with_optimization(self, query, k=5):
        """Optimized retrieval with caching and re-ranking"""
        # Check cache first
        cache_key = f"{query}_{k}"
        if cache_key in self.query_cache:
            return self.query_cache[cache_key]
        
        # Expand query for better retrieval
        expanded_query = self._expand_query(query)
        
        # Retrieve candidates
        candidates = self.vector_db.search(expanded_query, k=k*2)
        
        # Re-rank using multiple signals
        reranked = self._rerank_candidates(query, candidates)
        
        # Cache results
        self.query_cache[cache_key] = reranked[:k]
        
        return reranked[:k]
    
    def _expand_query(self, query):
        """Expand query with synonyms and related terms"""
        # Simple expansion (in practice, use WordNet or embeddings)
        expansions = {
            'car': ['vehicle', 'automobile'],
            'fast': ['quick', 'rapid', 'speedy'],
            'good': ['excellent', 'great', 'positive']
        }
        
        words = query.split()
        expanded_words = []
        
        for word in words:
            expanded_words.append(word)
            if word.lower() in expansions:
                expanded_words.extend(expansions[word.lower()])
        
        return ' '.join(expanded_words)
    
    def _rerank_candidates(self, original_query, candidates):
        """Re-rank candidates using multiple signals"""
        for candidate in candidates:
            doc = candidate['document']
            
            # Calculate multiple scoring signals
            semantic_score = candidate['similarity']
            
            # Keyword overlap score
            query_words = set(original_query.lower().split())
            doc_words = set(doc['content'].lower().split())
            keyword_score = len(query_words.intersection(doc_words)) / len(query_words)
            
            # Recency score
            timestamp = doc.get('metadata', {}).get('timestamp', datetime.now())
            days_old = (datetime.now() - timestamp).days
            recency_score = max(0, 1 - days_old / 365)
            
            # Content quality score (simplified)
            word_count = len(doc['content'].split())
            quality_score = min(1.0, word_count / 200)  # Prefer 200+ word documents
            
            # Combined score
            combined_score = (
                semantic_score * 0.4 +
                keyword_score * 0.3 +
                recency_score * 0.2 +
                quality_score * 0.1
            )
            
            candidate['combined_score'] = combined_score
        
        return sorted(candidates, key=lambda x: x['combined_score'], reverse=True)

3. Context Management ​

python
class ContextManager:
    def __init__(self, max_tokens=4000, model_name="gpt-3.5-turbo"):
        self.max_tokens = max_tokens
        self.model_name = model_name
        self.token_overhead = 200  # Reserve for prompt template
    
    def optimize_context(self, query, retrieved_docs):
        """Optimize context to fit within token limits"""
        available_tokens = self.max_tokens - self.token_overhead
        
        # Estimate tokens for query
        query_tokens = self._estimate_tokens(query)
        available_tokens -= query_tokens
        
        # Select and truncate documents to fit
        optimized_context = self._select_best_content(retrieved_docs, available_tokens)
        
        return optimized_context
    
    def _estimate_tokens(self, text):
        """Estimate token count (rough approximation)"""
        # Rough estimate: 1 token β‰ˆ 4 characters for English
        return len(text) // 4
    
    def _select_best_content(self, docs, available_tokens):
        """Select best content within token budget"""
        selected_content = []
        used_tokens = 0
        
        # Sort documents by relevance score
        sorted_docs = sorted(docs, key=lambda x: x.get('similarity', 0), reverse=True)
        
        for doc in sorted_docs:
            content = doc['document']['content']
            content_tokens = self._estimate_tokens(content)
            
            if used_tokens + content_tokens <= available_tokens:
                # Full document fits
                selected_content.append(content)
                used_tokens += content_tokens
            else:
                # Partial document
                remaining_tokens = available_tokens - used_tokens
                if remaining_tokens > 50:  # Only if meaningful space left
                    # Take most relevant sentences
                    partial_content = self._extract_relevant_sentences(
                        content, 
                        remaining_tokens
                    )
                    selected_content.append(partial_content)
                break
        
        return "\n\n".join(selected_content)
    
    def _extract_relevant_sentences(self, content, max_tokens):
        """Extract most relevant sentences that fit in token budget"""
        sentences = content.split('. ')
        selected_sentences = []
        used_tokens = 0
        
        for sentence in sentences:
            sentence_tokens = self._estimate_tokens(sentence)
            if used_tokens + sentence_tokens <= max_tokens:
                selected_sentences.append(sentence)
                used_tokens += sentence_tokens
            else:
                break
        
        return '. '.join(selected_sentences) + '.'

🎯 Key Takeaways ​

When to Use RAG ​

  • Dynamic Information: Content changes frequently
  • Large Knowledge Base: Too much information to fit in model context
  • Accuracy Requirements: Need current, factual information
  • Domain Expertise: Specialized knowledge not in training data

RAG vs. Alternatives ​

  • vs. Fine-tuning: RAG is better for changing information
  • vs. Prompt Engineering: RAG provides actual knowledge, not just formatting
  • vs. Function Calling: RAG is for information retrieval, functions for actions

Success Metrics ​

  • Relevance: How well retrieved documents match the query
  • Accuracy: Correctness of the final generated answer
  • Coverage: Percentage of questions that can be answered
  • Latency: Response time including retrieval and generation

Next Steps:

Released under the MIT License.