RAG (Retrieval-Augmented Generation)
Summary: Retrieval-Augmented Generation (RAG) is a hybrid AI approach that combines large language models with external knowledge retrieval systems. By first retrieving relevant documents from a knowledge base and then using that context to generate responses, RAG systems can provide more accurate, up-to-date, and factually grounded answers while reducing hallucinations and enabling access to information beyond the model's training data.
Overview
Retrieval-Augmented Generation (RAG) represents a paradigm shift in how AI systems access and utilize knowledge. Rather than relying solely on information encoded in model parameters during training, RAG systems dynamically retrieve relevant information from external knowledge sources and incorporate it into the generation process. This approach addresses key limitations of pure language models: knowledge cutoffs, hallucinations, and inability to access private or real-time information.
Core Architecture
Traditional LLM vs. RAG Approach
Traditional LLM Response Process
User Query → LLM → Response (based only on training data)
Limitations:
- Knowledge cutoff date
- Cannot access private documents
- May hallucinate facts
- No source citations
```text
### RAG Response Process
```text
User Query → Retrieval System → Relevant Documents → LLM + Context → Augmented Response
Benefits:
- Up-to-date information
- Private knowledge base access
- Reduced hallucinations
- Source attribution
```text
### RAG System Components
```python
class RAGSystem:
"""Core components of a RAG system"""
def __init__(self, knowledge_base, embedding_model, language_model):
# 1. Knowledge Base: External information repository
self.knowledge_base = knowledge_base # Vector database, documents, APIs
# 2. Retrieval System: Finds relevant information
self.embedding_model = embedding_model # For semantic search
self.vector_store = VectorStore(embedding_model)
# 3. Generation System: Creates responses using retrieved context
self.language_model = language_model # GPT, Claude, etc.
# 4. Orchestration: Coordinates retrieval and generation
self.retrieval_strategy = "semantic_search"
self.context_fusion_method = "prepend"
def process_query(self, user_query):
"""Main RAG pipeline"""
# Step 1: Retrieve relevant documents
relevant_docs = self.retrieve_documents(user_query)
# Step 2: Construct augmented prompt
augmented_prompt = self.create_augmented_prompt(user_query, relevant_docs)
# Step 3: Generate response with context
response = self.generate_response(augmented_prompt)
# Step 4: Post-process and add citations
final_response = self.add_citations(response, relevant_docs)
return final_response
def retrieve_documents(self, query):
"""Retrieve relevant documents from knowledge base"""
# Convert query to embedding
query_embedding = self.embedding_model.encode(query)
# Semantic search in vector database
similar_docs = self.vector_store.similarity_search(
query_embedding,
k=5 # Top-5 most relevant documents
)
return similar_docs
def create_augmented_prompt(self, query, documents):
"""Combine user query with retrieved context"""
context = "\n\n".join([doc.content for doc in documents])
prompt = f"""
Based on the following context information, please answer the question.
If the answer cannot be found in the context, say so clearly.
Context:
{context}
Question: {query}
Answer:"""
return prompt
```text
## Retrieval Strategies
### 1. Dense Vector Retrieval
Most common approach using semantic similarity:
```python
class DenseVectorRetriever:
def __init__(self, embedding_model, vector_database):
self.embedding_model = embedding_model
self.vector_db = vector_database
def retrieve(self, query, top_k=5):
"""Retrieve using dense vector similarity"""
# 1. Encode query
query_vector = self.embedding_model.encode(query)
# 2. Search vector database
results = self.vector_db.similarity_search(
query_vector,
k=top_k,
metric="cosine" # or "dot_product", "euclidean"
)
# 3. Return ranked results
return results
# Example usage with different embedding models
retrievers = {
"openai": DenseVectorRetriever(
OpenAIEmbeddings("text-embedding-ada-002"),
ChromaDB()
),
"sentence_transformers": DenseVectorRetriever(
SentenceTransformers("all-MiniLM-L6-v2"),
Pinecone()
),
"cohere": DenseVectorRetriever(
CohereEmbeddings("embed-english-v2.0"),
Weaviate()
)
}
```text
### 2. Hybrid Retrieval (Dense + Sparse)
Combines semantic and keyword-based search:
```python
class HybridRetriever:
def __init__(self, dense_retriever, sparse_retriever, alpha=0.5):
self.dense_retriever = dense_retriever
self.sparse_retriever = sparse_retriever # BM25, TF-IDF
self.alpha = alpha # Weighting between dense and sparse
def retrieve(self, query, top_k=5):
"""Combine dense and sparse retrieval results"""
# Get results from both systems
dense_results = self.dense_retriever.retrieve(query, top_k * 2)
sparse_results = self.sparse_retriever.retrieve(query, top_k * 2)
# Combine and re-rank
combined_scores = self.combine_scores(dense_results, sparse_results)
# Return top-k results
return sorted(combined_scores, key=lambda x: x.score, reverse=True)[:top_k]
def combine_scores(self, dense_results, sparse_results):
"""Weighted combination of dense and sparse scores"""
# Normalize scores to [0, 1] range
dense_normalized = self.normalize_scores(dense_results)
sparse_normalized = self.normalize_scores(sparse_results)
# Create document score map
doc_scores = {}
# Add dense scores
for doc, score in dense_normalized:
doc_scores[doc.id] = self.alpha * score
# Add sparse scores
for doc, score in sparse_normalized:
if doc.id in doc_scores:
doc_scores[doc.id] += (1 - self.alpha) * score
else:
doc_scores[doc.id] = (1 - self.alpha) * score
return doc_scores
## BM25 sparse retriever example
from rank_bm25 import BM25Okapi
class BM25Retriever:
def __init__(self, documents):
self.documents = documents
tokenized_docs = [doc.content.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
def retrieve(self, query, top_k=5):
"""BM25-based keyword retrieval"""
tokenized_query = query.lower().split()
scores = self.bm25.get_scores(tokenized_query)
# Get top-k documents
top_indices = scores.argsort()[-top_k:][::-1]
results = []
for idx in top_indices:
results.append({
'document': self.documents[idx],
'score': scores[idx]
})
return results
```text
### 3. Multi-Step Retrieval
Iterative retrieval for complex queries:
```python
class MultiStepRetriever:
def __init__(self, base_retriever, llm):
self.base_retriever = base_retriever
self.llm = llm
def retrieve(self, complex_query, max_steps=3):
"""Multi-step retrieval for complex questions"""
all_documents = []
current_query = complex_query
for step in range(max_steps):
# Retrieve documents for current query
docs = self.base_retriever.retrieve(current_query)
all_documents.extend(docs)
# Check if we have sufficient information
if self.has_sufficient_info(complex_query, all_documents):
break
# Generate follow-up query
current_query = self.generate_followup_query(
complex_query, all_documents
)
return self.deduplicate_and_rank(all_documents)
def has_sufficient_info(self, query, documents):
"""Determine if retrieved documents are sufficient"""
context = "\n".join([doc.content for doc in documents])
assessment_prompt = f"""
Question: {query}
Retrieved Information: {context}
Can the question be fully answered with the provided information?
Respond with only "YES" or "NO".
"""
response = self.llm.generate(assessment_prompt)
return "YES" in response.upper()
def generate_followup_query(self, original_query, retrieved_docs):
"""Generate more specific follow-up query"""
context = "\n".join([doc.content for doc in retrieved_docs])
followup_prompt = f"""
Original question: {original_query}
Information found so far: {context}
What specific information is still needed to fully answer the original question?
Generate a focused search query for the missing information.
Follow-up query:"""
return self.llm.generate(followup_prompt).strip()
```text
## Knowledge Base Construction
### Document Processing Pipeline
```python
class DocumentProcessor:
def __init__(self, chunk_size=1000, chunk_overlap=200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
def process_documents(self, raw_documents):
"""Complete document processing pipeline"""
processed_docs = []
for doc in raw_documents:
# 1. Extract text from various formats
text = self.extract_text(doc)
# 2. Clean and preprocess
cleaned_text = self.clean_text(text)
# 3. Split into chunks
chunks = self.chunk_document(cleaned_text, doc.metadata)
# 4. Generate embeddings
for chunk in chunks:
chunk.embedding = self.generate_embedding(chunk.content)
processed_docs.append(chunk)
return processed_docs
def extract_text(self, document):
"""Extract text from different file formats"""
if document.type == "pdf":
return self.extract_from_pdf(document.path)
elif document.type == "docx":
return self.extract_from_docx(document.path)
elif document.type == "html":
return self.extract_from_html(document.content)
elif document.type == "txt":
return document.content
else:
raise ValueError(f"Unsupported document type: {document.type}")
def chunk_document(self, text, metadata):
"""Split document into overlapping chunks"""
chunks = self.text_splitter.split_text(text)
document_chunks = []
for i, chunk_text in enumerate(chunks):
chunk = DocumentChunk(
content=chunk_text,
metadata={
**metadata,
'chunk_id': i,
'chunk_size': len(chunk_text),
'total_chunks': len(chunks)
}
)
document_chunks.append(chunk)
return document_chunks
## Advanced chunking strategies
class AdvancedChunker:
def __init__(self):
self.strategies = {
'semantic': self.semantic_chunking,
'sentence': self.sentence_chunking,
'paragraph': self.paragraph_chunking,
'section': self.section_chunking
}
def semantic_chunking(self, text, model):
"""Chunk based on semantic coherence"""
sentences = self.split_sentences(text)
sentence_embeddings = model.encode(sentences)
# Find semantic boundaries using cosine similarity
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
similarity = cosine_similarity(
sentence_embeddings[i-1:i],
sentence_embeddings[i:i+1]
)[0][0]
if similarity < 0.7: # Semantic boundary threshold
chunks.append(" ".join(current_chunk))
current_chunk = [sentences[i]]
else:
current_chunk.append(sentences[i])
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
```text
### Metadata Enhancement
```python
class MetadataEnricher:
def __init__(self, llm):
self.llm = llm
def enrich_chunk_metadata(self, chunk):
"""Add AI-generated metadata to chunks"""
# Generate summary
chunk.metadata['summary'] = self.generate_summary(chunk.content)
# Extract key topics
chunk.metadata['topics'] = self.extract_topics(chunk.content)
# Identify document type/section
chunk.metadata['section_type'] = self.classify_section(chunk.content)
# Generate questions this chunk could answer
chunk.metadata['potential_questions'] = self.generate_questions(chunk.content)
return chunk
def generate_summary(self, text):
"""Generate concise summary of chunk"""
prompt = f"""
Summarize the following text in 1-2 sentences:
{text}
Summary:"""
return self.llm.generate(prompt).strip()
def extract_topics(self, text):
"""Extract key topics/themes"""
prompt = f"""
Extract the main topics and themes from this text.
Return as a comma-separated list of 3-5 key terms.
Text: {text}
Topics:"""
response = self.llm.generate(prompt).strip()
return [topic.strip() for topic in response.split(',')]
def generate_questions(self, text):
"""Generate questions this text could answer"""
prompt = f"""
Generate 3 specific questions that could be answered using this text:
{text}
Questions:
1."""
response = self.llm.generate(prompt)
questions = []
for line in response.split('\n'):
if line.strip() and any(line.strip().startswith(str(i)) for i in range(1, 10)):
question = line.strip()[2:].strip() # Remove number prefix
questions.append(question)
return questions
```text
## Advanced RAG Techniques
### 1. Re-ranking and Filtering
```python
class CrossEncoderReranker:
def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.cross_encoder = CrossEncoder(model_name)
def rerank(self, query, documents, top_k=5):
"""Re-rank documents using cross-encoder"""
# Create query-document pairs
pairs = [(query, doc.content) for doc in documents]
# Score all pairs
scores = self.cross_encoder.predict(pairs)
# Sort by score and return top-k
scored_docs = list(zip(documents, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, score in scored_docs[:top_k]]
class RelevanceFilter:
def __init__(self, threshold=0.5):
self.threshold = threshold
def filter_relevant(self, query, documents, embedder):
"""Filter out irrelevant documents"""
query_embedding = embedder.encode([query])
doc_embeddings = embedder.encode([doc.content for doc in documents])
similarities = cosine_similarity(query_embedding, doc_embeddings)[0]
relevant_docs = []
for doc, similarity in zip(documents, similarities):
if similarity >= self.threshold:
doc.relevance_score = similarity
relevant_docs.append(doc)
return relevant_docs
```text
### 2. Context Optimization
```python
class ContextOptimizer:
def __init__(self, max_tokens=4000):
self.max_tokens = max_tokens
def optimize_context(self, query, documents):
"""Optimize retrieved context for LLM input"""
# 1. Remove redundant information
deduplicated_docs = self.remove_duplicates(documents)
# 2. Rank by relevance
ranked_docs = self.rank_by_relevance(query, deduplicated_docs)
# 3. Fit within token limit
optimized_context = self.fit_token_limit(ranked_docs)
# 4. Structure for optimal LLM consumption
structured_context = self.structure_context(optimized_context)
return structured_context
def remove_duplicates(self, documents):
"""Remove duplicate or highly similar content"""
unique_docs = []
seen_content = set()
for doc in documents:
# Simple content hashing for exact duplicates
content_hash = hash(doc.content)
if content_hash not in seen_content:
# Check for semantic similarity with existing docs
is_duplicate = False
for existing_doc in unique_docs:
similarity = self.calculate_similarity(doc.content, existing_doc.content)
if similarity > 0.9: # High similarity threshold
is_duplicate = True
break
if not is_duplicate:
unique_docs.append(doc)
seen_content.add(content_hash)
return unique_docs
def structure_context(self, documents):
"""Structure context for optimal LLM processing"""
structured_context = ""
for i, doc in enumerate(documents, 1):
# Add source attribution
source = doc.metadata.get('source', f'Document {i}')
structured_context += f"""
Source {i}: {source}
{doc.content}
---
"""
return structured_context.strip()
```text
### 3. Query Enhancement
```python
class QueryEnhancer:
def __init__(self, llm):
self.llm = llm
def enhance_query(self, original_query):
"""Enhance query for better retrieval"""
# Generate multiple query variations
variations = self.generate_query_variations(original_query)
# Extract key entities and concepts
entities = self.extract_entities(original_query)
# Add domain context if available
domain_context = self.identify_domain(original_query)
return {
'original': original_query,
'variations': variations,
'entities': entities,
'domain': domain_context
}
def generate_query_variations(self, query):
"""Generate different phrasings of the same query"""
prompt = f"""
Generate 3 different ways to phrase this question while keeping the same meaning:
Original: {query}
Variations:
1."""
response = self.llm.generate(prompt)
variations = []
for line in response.split('\n'):
if line.strip() and any(line.strip().startswith(str(i)) for i in range(1, 10)):
variation = line.strip()[2:].strip()
variations.append(variation)
return variations
class IterativeQueryRefinement:
def __init__(self, retriever, llm):
self.retriever = retriever
self.llm = llm
def refine_query_iteratively(self, initial_query, max_iterations=3):
"""Iteratively refine query based on retrieval results"""
current_query = initial_query
all_documents = []
for iteration in range(max_iterations):
# Retrieve with current query
docs = self.retriever.retrieve(current_query)
all_documents.extend(docs)
# Assess if we have good results
if self.assess_results_quality(initial_query, docs):
break
# Refine query based on retrieved documents
current_query = self.refine_query(initial_query, docs)
return self.deduplicate_documents(all_documents)
def assess_results_quality(self, query, documents):
"""Assess if retrieved documents are sufficient"""
if not documents:
return False
# Simple heuristic: check if top documents contain query terms
query_terms = query.lower().split()
for doc in documents[:3]: # Check top 3 documents
doc_content = doc.content.lower()
term_matches = sum(1 for term in query_terms if term in doc_content)
if term_matches / len(query_terms) > 0.5: # More than half terms match
return True
return False
```text
## Evaluation and Optimization
### RAG-Specific Metrics
```python
class RAGEvaluator:
def __init__(self, llm_judge):
self.llm_judge = llm_judge
def evaluate_rag_system(self, queries, ground_truth_answers, rag_system):
"""Comprehensive RAG system evaluation"""
results = {
'retrieval_metrics': {},
'generation_metrics': {},
'end_to_end_metrics': {}
}
for query, expected_answer in zip(queries, ground_truth_answers):
# Get RAG response
response = rag_system.process_query(query)
# Evaluate retrieval quality
retrieval_score = self.evaluate_retrieval(
query, response.retrieved_docs, expected_answer
)
# Evaluate generation quality
generation_score = self.evaluate_generation(
query, response.answer, expected_answer, response.retrieved_docs
)
# End-to-end evaluation
e2e_score = self.evaluate_end_to_end(
query, response.answer, expected_answer
)
# Store results
results['retrieval_metrics'][query] = retrieval_score
results['generation_metrics'][query] = generation_score
results['end_to_end_metrics'][query] = e2e_score
return self.aggregate_results(results)
def evaluate_retrieval(self, query, retrieved_docs, ground_truth):
"""Evaluate retrieval component"""
metrics = {}
# Relevance: How relevant are retrieved documents?
relevance_scores = []
for doc in retrieved_docs:
relevance = self.judge_relevance(query, doc.content)
relevance_scores.append(relevance)
metrics['avg_relevance'] = sum(relevance_scores) / len(relevance_scores)
# Coverage: Do retrieved docs contain information needed for answer?
coverage = self.judge_coverage(retrieved_docs, ground_truth)
metrics['coverage'] = coverage
# Diversity: How diverse are the retrieved documents?
diversity = self.calculate_diversity(retrieved_docs)
metrics['diversity'] = diversity
return metrics
def evaluate_generation(self, query, generated_answer, expected_answer, context):
"""Evaluate generation component"""
metrics = {}
# Faithfulness: Is answer consistent with retrieved context?
faithfulness = self.judge_faithfulness(generated_answer, context)
metrics['faithfulness'] = faithfulness
# Groundedness: Is answer grounded in provided context?
groundedness = self.judge_groundedness(generated_answer, context)
metrics['groundedness'] = groundedness
# Answer quality: How good is the answer overall?
answer_quality = self.judge_answer_quality(
query, generated_answer, expected_answer
)
metrics['answer_quality'] = answer_quality
return metrics
def judge_faithfulness(self, answer, context):
"""Judge if answer is faithful to the retrieved context"""
prompt = f"""
Context: {context}
Answer: {answer}
Is the answer faithful to the context? Does it only use information
present in the context without adding unsupported claims?
Rate faithfulness on a scale of 1-5 (5 = completely faithful).
Provide only the number.
"""
response = self.llm_judge.generate(prompt)
try:
return int(response.strip())
except ValueError:
return 3 # Default neutral score
def judge_groundedness(self, answer, context):
"""Judge if answer is properly grounded in context"""
prompt = f"""
Context: {context}
Answer: {answer}
How well is the answer grounded in the provided context?
Consider:
- Does the answer cite or reference the context appropriately?
- Are claims supported by the context?
- Is there appropriate attribution?
Rate groundedness on a scale of 1-5.
Provide only the number.
"""
response = self.llm_judge.generate(prompt)
try:
return int(response.strip())
except ValueError:
return 3
```text
### A/B Testing Framework
```python
class RAGABTester:
def __init__(self):
self.experiments = {}
self.metrics = [
'response_quality',
'retrieval_accuracy',
'user_satisfaction',
'response_time'
]
def setup_experiment(self, name, control_config, treatment_config):
"""Setup A/B test between two RAG configurations"""
self.experiments[name] = {
'control': RAGSystem(**control_config),
'treatment': RAGSystem(**treatment_config),
'control_results': [],
'treatment_results': [],
'queries': []
}
def run_experiment(self, experiment_name, test_queries):
"""Run A/B test on a set of queries"""
experiment = self.experiments[experiment_name]
for query in test_queries:
# Run both systems
control_response = experiment['control'].process_query(query)
treatment_response = experiment['treatment'].process_query(query)
# Collect metrics
control_metrics = self.collect_metrics(query, control_response)
treatment_metrics = self.collect_metrics(query, treatment_response)
experiment['control_results'].append(control_metrics)
experiment['treatment_results'].append(treatment_metrics)
experiment['queries'].append(query)
def analyze_results(self, experiment_name):
"""Analyze A/B test results"""
experiment = self.experiments[experiment_name]
results = {}
for metric in self.metrics:
control_values = [r[metric] for r in experiment['control_results']]
treatment_values = [r[metric] for r in experiment['treatment_results']]
# Statistical significance test
p_value = self.statistical_test(control_values, treatment_values)
results[metric] = {
'control_mean': sum(control_values) / len(control_values),
'treatment_mean': sum(treatment_values) / len(treatment_values),
'improvement': self.calculate_improvement(control_values, treatment_values),
'p_value': p_value,
'significant': p_value < 0.05
}
return results
```text
## Production Deployment
### Scalability Considerations
```python
class ProductionRAGSystem:
def __init__(self, config):
self.config = config
self.setup_components()
def setup_components(self):
"""Setup production-ready components"""
# Distributed vector database
self.vector_db = self.setup_vector_database()
# Caching layer
self.cache = self.setup_cache()
# Load balancer for LLM calls
self.llm_pool = self.setup_llm_pool()
# Monitoring and logging
self.monitor = self.setup_monitoring()
def setup_vector_database(self):
"""Setup scalable vector database"""
if self.config.vector_db == "pinecone":
return PineconeVectorStore(
api_key=self.config.pinecone_api_key,
environment=self.config.pinecone_env,
index_name=self.config.index_name
)
elif self.config.vector_db == "weaviate":
return WeaviateVectorStore(
url=self.config.weaviate_url,
api_key=self.config.weaviate_api_key
)
elif self.config.vector_db == "chroma":
return ChromaVectorStore(
persist_directory=self.config.chroma_persist_dir
)
def setup_cache(self):
"""Setup response caching for performance"""
return RedisCache(
host=self.config.redis_host,
port=self.config.redis_port,
ttl=self.config.cache_ttl # Time to live
)
def setup_llm_pool(self):
"""Setup LLM connection pool for high throughput"""
return LLMPool(
models=[
{"provider": "openai", "model": "gpt-3.5-turbo", "weight": 0.7},
{"provider": "anthropic", "model": "claude-3-sonnet", "weight": 0.3}
],
max_connections=10,
retry_strategy="exponential_backoff"
)
async def process_query_async(self, user_query):
"""Async query processing for high throughput"""
# Check cache first
cached_response = await self.cache.get(user_query)
if cached_response:
self.monitor.log_cache_hit(user_query)
return cached_response
# Process query
start_time = time.time()
try:
# Parallel retrieval and query enhancement
retrieval_task = asyncio.create_task(
self.retrieve_documents_async(user_query)
)
query_enhancement_task = asyncio.create_task(
self.enhance_query_async(user_query)
)
# Wait for both tasks
documents, enhanced_query = await asyncio.gather(
retrieval_task, query_enhancement_task
)
# Generate response
response = await self.generate_response_async(enhanced_query, documents)
# Cache response
await self.cache.set(user_query, response)
# Log metrics
processing_time = time.time() - start_time
self.monitor.log_query_processed(user_query, processing_time, len(documents))
return response
except Exception as e:
self.monitor.log_error(user_query, str(e))
raise
class RAGMonitoring:
def __init__(self):
self.metrics = {
'queries_processed': 0,
'avg_processing_time': 0,
'cache_hit_rate': 0,
'error_rate': 0,
'retrieval_accuracy': 0
}
def log_query_processed(self, query, processing_time, num_documents):
"""Log successful query processing"""
self.metrics['queries_processed'] += 1
# Update average processing time
current_avg = self.metrics['avg_processing_time']
new_avg = ((current_avg * (self.metrics['queries_processed'] - 1) +
processing_time) / self.metrics['queries_processed'])
self.metrics['avg_processing_time'] = new_avg
# Log to monitoring service
self.send_metric('query_processed', {
'processing_time': processing_time,
'num_documents': num_documents,
'timestamp': time.time()
})
def generate_dashboard(self):
"""Generate monitoring dashboard data"""
return {
'system_health': self.assess_system_health(),
'performance_metrics': self.metrics,
'recent_errors': self.get_recent_errors(),
'top_queries': self.get_top_queries(),
'retrieval_stats': self.get_retrieval_stats()
}
```text
## Common Challenges and Solutions
### 1. Context Window Limitations
```python
class ContextWindowManager:
def __init__(self, max_tokens=4000):
self.max_tokens = max_tokens
self.tokenizer = tiktoken.get_encoding("cl100k_base") # GPT tokenizer
def manage_context(self, query, documents):
"""Intelligently manage context within token limits"""
# Reserve tokens for query, system prompt, and response
available_tokens = self.max_tokens - 1000
# Prioritize documents by relevance score
sorted_docs = sorted(documents, key=lambda x: x.relevance_score, reverse=True)
selected_docs = []
current_tokens = 0
for doc in sorted_docs:
doc_tokens = len(self.tokenizer.encode(doc.content))
if current_tokens + doc_tokens <= available_tokens:
selected_docs.append(doc)
current_tokens += doc_tokens
else:
# Try to fit partial document
remaining_tokens = available_tokens - current_tokens
if remaining_tokens > 200: # Minimum meaningful chunk
truncated_content = self.truncate_document(
doc.content, remaining_tokens
)
selected_docs.append(DocumentChunk(
content=truncated_content,
metadata={**doc.metadata, 'truncated': True}
))
break
return selected_docs
def truncate_document(self, content, max_tokens):
"""Intelligently truncate document to fit token limit"""
# Split into sentences
sentences = content.split('. ')
truncated_sentences = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = len(self.tokenizer.encode(sentence))
if current_tokens + sentence_tokens <= max_tokens:
truncated_sentences.append(sentence)
current_tokens += sentence_tokens
else:
break
return '. '.join(truncated_sentences)
```text
### 2. Retrieval Quality Issues
```python
class RetrievalQualityImprover:
def __init__(self, feedback_loop=True):
self.feedback_loop = feedback_loop
self.query_performance_log = {}
def improve_retrieval_iteratively(self, query, initial_results, llm):
"""Iteratively improve retrieval quality"""
improved_results = initial_results
# Analyze initial results
quality_assessment = self.assess_retrieval_quality(query, initial_results)
if quality_assessment['score'] < 0.7: # Needs improvement
# Try different retrieval strategies
strategies = [
'query_expansion',
'semantic_search',
'hybrid_search',
'multi_vector_search'
]
for strategy in strategies:
enhanced_results = self.apply_strategy(query, strategy)
strategy_quality = self.assess_retrieval_quality(query, enhanced_results)
if strategy_quality['score'] > quality_assessment['score']:
improved_results = enhanced_results
quality_assessment = strategy_quality
# Log performance for learning
if self.feedback_loop:
self.log_query_performance(query, improved_results, quality_assessment)
return improved_results
def apply_strategy(self, query, strategy):
"""Apply specific retrieval improvement strategy"""
if strategy == 'query_expansion':
return self.expand_query_and_search(query)
elif strategy == 'semantic_search':
return self.semantic_search_with_reranking(query)
elif strategy == 'hybrid_search':
return self.hybrid_dense_sparse_search(query)
elif strategy == 'multi_vector_search':
return self.multi_vector_search(query)
return []
```text
### 3. Hallucination Reduction
```python
class HallucinationGuard:
def __init__(self, llm):
self.llm = llm
self.confidence_threshold = 0.7
def generate_with_hallucination_check(self, query, context):
"""Generate response with hallucination detection and prevention"""
# Initial generation
response = self.generate_response(query, context)
# Check for potential hallucinations
hallucination_score = self.detect_hallucination(response, context)
if hallucination_score > self.confidence_threshold:
# High risk of hallucination - regenerate with constraints
constrained_response = self.generate_constrained_response(query, context)
return constrained_response
return response
def detect_hallucination(self, response, context):
"""Detect potential hallucinations in response"""
detection_prompt = f"""
Context: {context}
Response: {response}
Does the response contain information that is not present in or
contradicted by the context? Consider:
1. Facts stated that aren't in the context
2. Numbers or dates not mentioned in context
3. Claims that contradict the context
Rate the likelihood of hallucination from 0.0 to 1.0.
Provide only the number.
"""
result = self.llm.generate(detection_prompt)
try:
return float(result.strip())
except ValueError:
return 0.5 # Default to moderate risk
def generate_constrained_response(self, query, context):
"""Generate response with strict constraints to prevent hallucination"""
constrained_prompt = f"""
Based ONLY on the provided context, answer the following question.
If the context doesn't contain enough information to answer the question,
explicitly state that the information is not available in the provided context.
IMPORTANT CONSTRAINTS:
- Only use information directly stated in the context
- Do not infer or extrapolate beyond what is explicitly stated
- If uncertain about any detail, indicate the uncertainty
- Cite specific parts of the context when making claims
Context: {context}
Question: {query}
Answer:"""
return self.llm.generate(constrained_prompt)
```text
Retrieval-Augmented Generation represents a significant advancement in making AI systems more accurate, up-to-date, and
grounded in factual information. By combining the reasoning capabilities of large language models with the vast and
current information available in external knowledge bases, RAG systems offer a practical solution to many limitations of
standalone language models. As the technology continues to evolve, we can expect to see more sophisticated retrieval
strategies, better integration techniques, and increasingly robust production deployments that make RAG an essential
component of enterprise AI applications.