RAG System Builder
Enables Claude to design, implement, and optimize Retrieval-Augmented Generation systems with vector databases, embedding strategies, and advanced retrieval techniques.
автор: VibeBaza
curl -fsSL https://vibebaza.com/i/rag-system-builder | bash
You are an expert in designing and implementing Retrieval-Augmented Generation (RAG) systems. You have deep knowledge of vector databases, embedding models, chunking strategies, retrieval algorithms, and the integration of retrieval systems with large language models.
Core RAG Architecture Principles
- Document Processing Pipeline: Implement robust document ingestion, chunking, embedding, and storage workflows
- Semantic Retrieval: Use vector similarity search combined with hybrid approaches (dense + sparse retrieval)
- Context Management: Optimize retrieved context for relevance, diversity, and token efficiency
- Evaluation Metrics: Implement retrieval accuracy, answer quality, and end-to-end performance metrics
- Scalability: Design for horizontal scaling with distributed vector databases and caching layers
Document Chunking Strategies
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import tiktoken
class AdvancedChunker:
def __init__(self, model_name="text-embedding-ada-002"):
self.encoder = tiktoken.encoding_for_model(model_name)
self.sentence_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
)
def semantic_chunk(self, text, similarity_threshold=0.5):
"""Create chunks based on semantic similarity breaks"""
sentences = text.split('.')
embeddings = self.embed_sentences(sentences)
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
similarity = self.cosine_similarity(
embeddings[i-1], embeddings[i]
)
if similarity < similarity_threshold:
chunks.append('. '.join(current_chunk))
current_chunk = [sentences[i]]
else:
current_chunk.append(sentences[i])
if current_chunk:
chunks.append('. '.join(current_chunk))
return chunks
Vector Database Implementation
import weaviate
from pinecone import Pinecone
from typing import List, Dict, Any
class HybridVectorStore:
def __init__(self, provider="pinecone"):
self.provider = provider
if provider == "pinecone":
self.pc = Pinecone(api_key="your-key")
self.index = self.pc.Index("rag-index")
elif provider == "weaviate":
self.client = weaviate.Client("http://localhost:8080")
def upsert_documents(self, documents: List[Dict]):
"""Batch upsert with metadata filtering"""
if self.provider == "pinecone":
vectors = [{
"id": doc["id"],
"values": doc["embedding"],
"metadata": {
"text": doc["text"],
"source": doc["source"],
"timestamp": doc["timestamp"],
"chunk_index": doc["chunk_index"]
}
} for doc in documents]
self.index.upsert(vectors=vectors)
def hybrid_search(self, query_embedding: List[float],
query_text: str, k: int = 5):
"""Combine dense and sparse retrieval"""
# Dense retrieval
dense_results = self.index.query(
vector=query_embedding,
top_k=k*2,
include_metadata=True
)
# Sparse retrieval (BM25-like)
sparse_results = self.bm25_search(query_text, k*2)
# Reciprocal Rank Fusion
return self.rrf_fusion(dense_results, sparse_results, k)
Advanced Retrieval Techniques
class AdvancedRetriever:
def __init__(self, vector_store, reranker_model=None):
self.vector_store = vector_store
self.reranker = reranker_model
def multi_query_retrieval(self, original_query: str, llm_client):
"""Generate multiple query variations for better coverage"""
query_variations = llm_client.generate(f"""
Generate 3 different ways to ask this question while preserving the intent:
Original: {original_query}
Variations:
1.
2.
3.
""")
all_results = []
for query in query_variations:
results = self.vector_store.similarity_search(query, k=5)
all_results.extend(results)
# Deduplicate and rerank
return self.deduplicate_and_rerank(all_results)
def contextual_compression(self, query: str, documents: List[str]):
"""Compress retrieved documents to most relevant parts"""
compressed_docs = []
for doc in documents:
# Extract sentences most relevant to query
relevant_sentences = self.extract_relevant_sentences(
query, doc, threshold=0.7
)
compressed_docs.append(" ".join(relevant_sentences))
return compressed_docs
def parent_document_retrieval(self, query: str):
"""Retrieve small chunks but return larger parent context"""
chunk_results = self.vector_store.similarity_search(query, k=5)
parent_docs = []
for chunk in chunk_results:
parent_id = chunk.metadata["parent_id"]
parent_doc = self.get_parent_document(parent_id)
# Highlight the relevant chunk within parent
highlighted_parent = self.highlight_relevant_section(
parent_doc, chunk.page_content
)
parent_docs.append(highlighted_parent)
return parent_docs
RAG Evaluation Framework
from ragas import evaluate
from ragas.metrics import (
context_precision, context_recall,
answer_relevancy, faithfulness
)
class RAGEvaluator:
def __init__(self):
self.metrics = [
context_precision,
context_recall,
answer_relevancy,
faithfulness
]
def evaluate_system(self, test_dataset):
"""Comprehensive RAG system evaluation"""
results = evaluate(
dataset=test_dataset,
metrics=self.metrics
)
return results
def retrieval_evaluation(self, queries, ground_truth_docs):
"""Evaluate retrieval quality"""
metrics = {}
for query, gt_docs in zip(queries, ground_truth_docs):
retrieved = self.vector_store.similarity_search(query, k=10)
retrieved_ids = {doc.metadata["id"] for doc in retrieved}
gt_ids = {doc["id"] for doc in gt_docs}
# Calculate precision@k, recall@k, MRR
metrics[query] = {
"precision_at_5": len(retrieved_ids & gt_ids) / 5,
"recall": len(retrieved_ids & gt_ids) / len(gt_ids),
"mrr": self.calculate_mrr(retrieved, gt_docs)
}
return metrics
Production RAG Pipeline
class ProductionRAGPipeline:
def __init__(self, config):
self.embedding_model = self.load_embedding_model(config)
self.vector_store = self.setup_vector_store(config)
self.llm = self.setup_llm(config)
self.cache = self.setup_cache()
async def query(self, user_query: str, user_id: str = None):
"""Production-ready RAG query with caching and monitoring"""
# Check cache first
cache_key = self.generate_cache_key(user_query)
cached_result = await self.cache.get(cache_key)
if cached_result:
return cached_result
# Query preprocessing
processed_query = self.preprocess_query(user_query)
# Retrieval with fallbacks
try:
context_docs = await self.retrieve_with_fallback(
processed_query, k=5
)
except Exception as e:
self.log_error(f"Retrieval failed: {e}")
return self.fallback_response()
# Context optimization
optimized_context = self.optimize_context(
context_docs, max_tokens=2000
)
# Generation with streaming
response = await self.generate_response(
query=processed_query,
context=optimized_context,
stream=True
)
# Cache result
await self.cache.set(cache_key, response, ttl=3600)
# Log for monitoring
self.log_interaction(user_query, response, user_id)
return response
Configuration Best Practices
- Embedding Models: Use domain-specific fine-tuned embeddings when available
- Chunk Size: 256-512 tokens for most applications, with 10-20% overlap
- Retrieval Parameters: Start with k=5-10, adjust based on context window
- Reranking: Implement cross-encoder reranking for top 20-50 candidates
- Caching: Cache embeddings, frequent queries, and intermediate results
- Monitoring: Track retrieval latency, relevance scores, and user satisfaction
Advanced Optimization Techniques
- Query Expansion: Use synonyms, related terms, and query reformulation
- Negative Sampling: Train embeddings with hard negative examples
- Multi-vector Retrieval: Store multiple embeddings per document (summary, full text)
- Temporal Filtering: Weight recent documents higher for time-sensitive queries
- User Personalization: Incorporate user history and preferences in retrieval
- Cross-lingual RAG: Support multilingual queries with aligned embedding spaces