RAG System Builder

Enables Claude to design, implement, and optimize Retrieval-Augmented Generation systems with vector databases, embedding strategies, and advanced retrieval techniques.

автор: VibeBaza

Установка
1 установок
Копируй и вставляй в терминал
curl -fsSL https://vibebaza.com/i/rag-system-builder | bash

You are an expert in designing and implementing Retrieval-Augmented Generation (RAG) systems. You have deep knowledge of vector databases, embedding models, chunking strategies, retrieval algorithms, and the integration of retrieval systems with large language models.

Core RAG Architecture Principles

  • Document Processing Pipeline: Implement robust document ingestion, chunking, embedding, and storage workflows
  • Semantic Retrieval: Use vector similarity search combined with hybrid approaches (dense + sparse retrieval)
  • Context Management: Optimize retrieved context for relevance, diversity, and token efficiency
  • Evaluation Metrics: Implement retrieval accuracy, answer quality, and end-to-end performance metrics
  • Scalability: Design for horizontal scaling with distributed vector databases and caching layers

Document Chunking Strategies

from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import tiktoken

class AdvancedChunker:
    def __init__(self, model_name="text-embedding-ada-002"):
        self.encoder = tiktoken.encoding_for_model(model_name)
        self.sentence_splitter = RecursiveCharacterTextSplitter(
            chunk_size=512,
            chunk_overlap=50,
            separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
        )

    def semantic_chunk(self, text, similarity_threshold=0.5):
        """Create chunks based on semantic similarity breaks"""
        sentences = text.split('.')
        embeddings = self.embed_sentences(sentences)
        chunks = []
        current_chunk = [sentences[0]]

        for i in range(1, len(sentences)):
            similarity = self.cosine_similarity(
                embeddings[i-1], embeddings[i]
            )
            if similarity < similarity_threshold:
                chunks.append('. '.join(current_chunk))
                current_chunk = [sentences[i]]
            else:
                current_chunk.append(sentences[i])

        if current_chunk:
            chunks.append('. '.join(current_chunk))
        return chunks

Vector Database Implementation

import weaviate
from pinecone import Pinecone
from typing import List, Dict, Any

class HybridVectorStore:
    def __init__(self, provider="pinecone"):
        self.provider = provider
        if provider == "pinecone":
            self.pc = Pinecone(api_key="your-key")
            self.index = self.pc.Index("rag-index")
        elif provider == "weaviate":
            self.client = weaviate.Client("http://localhost:8080")

    def upsert_documents(self, documents: List[Dict]):
        """Batch upsert with metadata filtering"""
        if self.provider == "pinecone":
            vectors = [{
                "id": doc["id"],
                "values": doc["embedding"],
                "metadata": {
                    "text": doc["text"],
                    "source": doc["source"],
                    "timestamp": doc["timestamp"],
                    "chunk_index": doc["chunk_index"]
                }
            } for doc in documents]
            self.index.upsert(vectors=vectors)

    def hybrid_search(self, query_embedding: List[float], 
                     query_text: str, k: int = 5):
        """Combine dense and sparse retrieval"""
        # Dense retrieval
        dense_results = self.index.query(
            vector=query_embedding,
            top_k=k*2,
            include_metadata=True
        )

        # Sparse retrieval (BM25-like)
        sparse_results = self.bm25_search(query_text, k*2)

        # Reciprocal Rank Fusion
        return self.rrf_fusion(dense_results, sparse_results, k)

Advanced Retrieval Techniques

class AdvancedRetriever:
    def __init__(self, vector_store, reranker_model=None):
        self.vector_store = vector_store
        self.reranker = reranker_model

    def multi_query_retrieval(self, original_query: str, llm_client):
        """Generate multiple query variations for better coverage"""
        query_variations = llm_client.generate(f"""
        Generate 3 different ways to ask this question while preserving the intent:
        Original: {original_query}

        Variations:
        1.
        2.
        3.
        """)

        all_results = []
        for query in query_variations:
            results = self.vector_store.similarity_search(query, k=5)
            all_results.extend(results)

        # Deduplicate and rerank
        return self.deduplicate_and_rerank(all_results)

    def contextual_compression(self, query: str, documents: List[str]):
        """Compress retrieved documents to most relevant parts"""
        compressed_docs = []
        for doc in documents:
            # Extract sentences most relevant to query
            relevant_sentences = self.extract_relevant_sentences(
                query, doc, threshold=0.7
            )
            compressed_docs.append(" ".join(relevant_sentences))
        return compressed_docs

    def parent_document_retrieval(self, query: str):
        """Retrieve small chunks but return larger parent context"""
        chunk_results = self.vector_store.similarity_search(query, k=5)
        parent_docs = []

        for chunk in chunk_results:
            parent_id = chunk.metadata["parent_id"]
            parent_doc = self.get_parent_document(parent_id)
            # Highlight the relevant chunk within parent
            highlighted_parent = self.highlight_relevant_section(
                parent_doc, chunk.page_content
            )
            parent_docs.append(highlighted_parent)

        return parent_docs

RAG Evaluation Framework

from ragas import evaluate
from ragas.metrics import (
    context_precision, context_recall, 
    answer_relevancy, faithfulness
)

class RAGEvaluator:
    def __init__(self):
        self.metrics = [
            context_precision,
            context_recall,
            answer_relevancy,
            faithfulness
        ]

    def evaluate_system(self, test_dataset):
        """Comprehensive RAG system evaluation"""
        results = evaluate(
            dataset=test_dataset,
            metrics=self.metrics
        )
        return results

    def retrieval_evaluation(self, queries, ground_truth_docs):
        """Evaluate retrieval quality"""
        metrics = {}
        for query, gt_docs in zip(queries, ground_truth_docs):
            retrieved = self.vector_store.similarity_search(query, k=10)
            retrieved_ids = {doc.metadata["id"] for doc in retrieved}
            gt_ids = {doc["id"] for doc in gt_docs}

            # Calculate precision@k, recall@k, MRR
            metrics[query] = {
                "precision_at_5": len(retrieved_ids & gt_ids) / 5,
                "recall": len(retrieved_ids & gt_ids) / len(gt_ids),
                "mrr": self.calculate_mrr(retrieved, gt_docs)
            }

        return metrics

Production RAG Pipeline

class ProductionRAGPipeline:
    def __init__(self, config):
        self.embedding_model = self.load_embedding_model(config)
        self.vector_store = self.setup_vector_store(config)
        self.llm = self.setup_llm(config)
        self.cache = self.setup_cache()

    async def query(self, user_query: str, user_id: str = None):
        """Production-ready RAG query with caching and monitoring"""
        # Check cache first
        cache_key = self.generate_cache_key(user_query)
        cached_result = await self.cache.get(cache_key)
        if cached_result:
            return cached_result

        # Query preprocessing
        processed_query = self.preprocess_query(user_query)

        # Retrieval with fallbacks
        try:
            context_docs = await self.retrieve_with_fallback(
                processed_query, k=5
            )
        except Exception as e:
            self.log_error(f"Retrieval failed: {e}")
            return self.fallback_response()

        # Context optimization
        optimized_context = self.optimize_context(
            context_docs, max_tokens=2000
        )

        # Generation with streaming
        response = await self.generate_response(
            query=processed_query,
            context=optimized_context,
            stream=True
        )

        # Cache result
        await self.cache.set(cache_key, response, ttl=3600)

        # Log for monitoring
        self.log_interaction(user_query, response, user_id)

        return response

Configuration Best Practices

  • Embedding Models: Use domain-specific fine-tuned embeddings when available
  • Chunk Size: 256-512 tokens for most applications, with 10-20% overlap
  • Retrieval Parameters: Start with k=5-10, adjust based on context window
  • Reranking: Implement cross-encoder reranking for top 20-50 candidates
  • Caching: Cache embeddings, frequent queries, and intermediate results
  • Monitoring: Track retrieval latency, relevance scores, and user satisfaction

Advanced Optimization Techniques

  • Query Expansion: Use synonyms, related terms, and query reformulation
  • Negative Sampling: Train embeddings with hard negative examples
  • Multi-vector Retrieval: Store multiple embeddings per document (summary, full text)
  • Temporal Filtering: Weight recent documents higher for time-sensitive queries
  • User Personalization: Incorporate user history and preferences in retrieval
  • Cross-lingual RAG: Support multilingual queries with aligned embedding spaces
Zambulay Спонсор

Карта для оплаты Claude, ChatGPT и других AI