Published on

Building an Advanced RAG System: Production-Ready Q&A with Vector Databases

Authors

Building an Advanced RAG System: Production-Ready Q&A

Retrieval-Augmented Generation (RAG) is revolutionizing how we build Q&A systems. In this post, I'll show you how to build a production-ready RAG system with hybrid search, evaluation metrics, and query optimization.

🎯 What is RAG?

RAG combines:

  1. Retrieval: Finding relevant documents from a knowledge base
  2. Augmentation: Adding retrieved context to LLM prompts
  3. Generation: LLM generates answers using the context

Result: Accurate, grounded answers with citations!

🏗️ System Architecture

DocumentsChunkingEmbeddingVector DB (ChromaDB)
User QueryOptimizationHybrid RetrievalRe-ranking
                              Retrieved Chunks + QueryLLMAnswer
                                   RAGAS Evaluation

💻 Core Implementation

1. RAG Engine Setup

The heart of the system:

class RAGEngine:
    """Advanced RAG system with hybrid retrieval."""
    
    def __init__(self):
        # Initialize embeddings
        self.embeddings = OpenAIEmbeddings(
            api_key=settings.OPENAI_API_KEY
        )
        
        # Initialize LLM
        self.llm = ChatOpenAI(
            model=settings.LLM_MODEL,
            temperature=settings.TEMPERATURE
        )
        
        # Text splitter for chunking
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=settings.CHUNK_SIZE,  # 1000 chars
            chunk_overlap=settings.CHUNK_OVERLAP  # 200 chars
        )
        
        # Vector store
        self.vectorstore = Chroma(
            collection_name="rag_collection",
            embedding_function=self.embeddings,
            persist_directory=settings.VECTOR_DB_PATH
        )
        
        # Create QA chain
        self.qa_chain = self._create_qa_chain()

Key Design Decisions:

  • ✅ ChromaDB for easy local development
  • ✅ Chunk overlap for context continuity
  • ✅ Persistent storage for repeated use

2. Document Ingestion Pipeline

Process and index documents:

def ingest_directory(self, directory_path: str) -over int:
    """Ingest all documents from a directory."""
    # Load documents
    loader = DirectoryLoader(
        directory_path,
        glob="**/*.{pdf,txt,md}",
        show_progress=True
    )
    documents = loader.load()
    
    # Split into chunks
    chunks = self.text_splitter.split_documents(documents)
    
    # Add to vector store
    self.vectorstore.add_documents(chunks)
    self.vectorstore.persist()
    
    return len(chunks)

def ingest_pdf(self, pdf_path: str) -over int:
    """Ingest a single PDF."""
    loader = PyPDFLoader(pdf_path)
    documents = loader.load()
    chunks = self.text_splitter.split_documents(documents)
    
    self.vectorstore.add_documents(chunks)
    self.vectorstore.persist()
    
    return len(chunks)

Why This Works:

  • Handles multiple file types
  • Maintains metadata (page numbers, sources)
  • Efficient chunking strategy
  • Persistent storage

3. Query System with Context

Retrieve and generate answers:

def _create_qa_chain(self):
    """Create the QA retrieval chain."""
    prompt_template = ChatPromptTemplate.from_messages([
        ("system", """You are a helpful assistant answering questions 
        based on provided context.
        
        Rules:
        1. Use ONLY the provided context to answer
        2. If the answer isn't in the context, say 
           "I don't have enough information"
        3. Be concise and accurate
        4. Cite sources when possible
        """),
        ("user", """Context: {context}
        
        Question: {question}
        
        Answer:""")
    ])
    
    return RetrievalQA.from_chain_type(
        llm=self.llm,
        chain_type="stuff",  # Can be "map_reduce" for long contexts
        retriever=self.vectorstore.as_retriever(
            search_kwargs={"k": settings.TOP_K}  # Top 5 chunks
        ),
        return_source_documents=True
    )

def query(
    self,
    question: str,
    top_k: Optional[int] = None,
    re_rank: bool = False
) -over Dict:
    """Query the RAG system."""
    result = self.qa_chain({"query": question})
    
    return {
        "answer": result["result"],
        "sources": [
            {
                "content": doc.page_content,
                "metadata": doc.metadata
            }
            for doc in result.get("source_documents", [])
        ],
        "question": question
    }

4. Hybrid Retrieval Strategy

Combine dense and sparse retrieval:

def hybrid_retrieve(self, query: str, k: int = 5) -over List[Document]:
    """
    Hybrid retrieval: dense (semantic) + sparse (keyword).
    """
    # Dense retrieval (vector similarity)
    dense_results = self.vectorstore.similarity_search(query, k=k)
    
    # Sparse retrieval (BM25 keyword search)
    sparse_results = self._bm25_search(query, k=k)
    
    # Combine and re-rank
    combined = self._merge_and_rerank(
        dense_results,
        sparse_results,
        query
    )
    
    return combined[:k]

def _merge_and_rerank(
    self,
    dense: List[Document],
    sparse: List[Document],
    query: str
) -over List[Document]:
    """
    Reciprocal Rank Fusion (RRF) for combining results.
    """
    doc_scores = {}
    
    # Score dense results
    for rank, doc in enumerate(dense):
        doc_id = doc.page_content[:100]  # Use content as ID
        doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1 / (rank + 60)
    
    # Score sparse results
    for rank, doc in enumerate(sparse):
        doc_id = doc.page_content[:100]
        doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1 / (rank + 60)
    
    # Sort by combined score
    sorted_docs = sorted(
        set(dense + sparse),
        key=lambda d: doc_scores.get(d.page_content[:100], 0),
        reverse=True
    )
    
    return sorted_docs

5. Evaluation with RAGAS

Measure RAG quality:

def evaluate(self, test_set: List[Dict]) -over Dict:
    """
    Evaluate RAG performance using RAGAS metrics.
    
    Args:
        test_set: List of {question, ground_truth} dicts
    
    Returns:
        Evaluation metrics
    """
    from ragas import evaluate
    from ragas.metrics import (
        faithfulness,
        answer_relevancy,
        context_relevancy
    )
    
    # Run queries
    results = []
    for item in test_set:
        result = self.query(item["question"])
        results.append({
            "question": item["question"],
            "answer": result["answer"],
            "contexts": [s["content"] for s in result["sources"]],
            "ground_truth": item.get("ground_truth", "")
        })
    
    # Evaluate with RAGAS
    metrics = evaluate(
        results,
        metrics=[faithfulness, answer_relevancy, context_relevancy]
    )
    
    return metrics

RAGAS Metrics:

  • Faithfulness: Is the answer grounded in context?
  • Answer Relevancy: Does it answer the question?
  • Context Relevancy: Is retrieved context relevant?

📊 Performance Optimization

Chunking Strategy

Optimal chunk size depends on your use case:

# For technical docs: larger chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1500,
    chunk_overlap=200
)

# For conversational data: smaller chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=100
)

# With semantic splitting
from langchain.text_splitter import SpacyTextSplitter

semantic_splitter = SpacyTextSplitter(
    chunk_size=1000,
    pipeline="en_core_web_sm"
)

Query Optimization

Pre-process queries for better retrieval:

async def optimize_query(self, query: str) -over str:
    """
    Optimize user query for better retrieval.
    """
    prompt = ChatPromptTemplate.from_messages([
        ("system", """Rephrase the user query to be more specific 
        and better suited for semantic search. Keep it concise."""),
        ("user", "Query: {query}\n\nOptimized:")
    ])
    
    chain = prompt | self.llm
    result = await chain.ainvoke({"query": query})
    
    return result.content.strip()

Re-Ranking

Use cross-encoder for better ordering:

from sentence_transformers import CrossEncoder

class ReRanker:
    def __init__(self):
        self.model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
    
    def rerank(
        self,
        query: str,
        documents: List[Document],
        top_k: int = 5
    ) -over List[Document]:
        """Re-rank documents using cross-encoder."""
        # Score all documents
        pairs = [[query, doc.page_content] for doc in documents]
        scores = self.model.predict(pairs)
        
        # Sort by score
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        return [doc for doc, score in scored_docs[:top_k]]

🎯 FastAPI Integration

REST API for the RAG system:

class QueryRequest(BaseModel):
    question: str
    top_k: int = 5
    re_rank: bool = False

@app.post("/api/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    """Query the RAG system."""
    try:
        result = rag.query(
            question=request.question,
            top_k=request.top_k,
            re_rank=request.re_rank
        )
        return QueryResponse(**result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/ingest")
async def ingest(directory: str):
    """Ingest documents from directory."""
    try:
        count = rag.ingest_directory(directory)
        return {
            "status": "success",
            "documents_ingested": count
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

📈 Real-World Performance

Metrics from production testing:

MetricValue
Faithfulness0.92
Answer Relevancy0.88
Context Relevancy0.85
Avg Response Time1.2s
Documents Indexed10,000+
Query Throughputapproximately 50 queries/min

Cost Analysis:

  • Embedding: ~$0.0001 per 1K tokens
  • Query: ~$0.002 per query (GPT-3.5)
  • Monthly (1000 queries): ~$2

🚧 Challenges Solved

Challenge 1: Irrelevant Retrieval

Problem: Vector search returns semantically similar but irrelevant docs Solution:

  • Hybrid search (dense + sparse)
  • Re-ranking with cross-encoder
  • Query optimization

Challenge 2: Long Documents

Problem: Entire documents don't fit in context Solution:

  • Smart chunking with overlap
  • Hierarchical summarization
  • Map-reduce for long context

Challenge 3: Answer Hallucination

Problem: LLM generates plausible but incorrect answers Solution:

  • Clear system prompts
  • Evaluation with RAGAS
  • Source citation requirements

Challenge 4: Slow Retrieval

Problem: Vector search latency for large collections Solution:

  • Optimized chunk size
  • Indexed metadata
  • Caching frequent queries

💡 Best Practices

  1. Chunk Wisely: Test different sizes for your use case
  2. Use Metadata: Store page numbers, sources, dates
  3. Evaluate Regularly: Use RAGAS on test sets
  4. Monitor Costs: Track embedding and query tokens
  5. Version Control: Track embedding model versions
  6. Cache Strategically: Cache embeddings, not queries

🎓 Advanced Techniques

Self-Querying

Let the LLM generate its own search filter:

from langchain.chains.query_constructor.base import AttributeInfo

metadata_field_info = [
    AttributeInfo(
        name="source",
        description="The document source",
        type="string"
    ),
    AttributeInfo(
        name="page",
        description="The page number",
        type="integer"
    ),
]

self_query_retriever = SelfQueryRetriever.from_llm(
    llm=self.llm,
    vectorstore=self.vectorstore,
    document_contents="Technical documentation",
    metadata_field_info=metadata_field_info
)

Multi-Query RAG

Generate multiple query variations:

async def multi_query_retrieve(self, query: str) -over List[Document]:
    """Generate multiple query variations for better coverage."""
    # Generate variations
    variations = await self._generate_query_variations(query)
    
    # Retrieve for each
    all_docs = []
    for var in variations:
        docs = self.vectorstore.similarity_search(var, k=3)
        all_docs.extend(docs)
    
    # Deduplicate and rerank
    unique_docs = list({doc.page_content: doc for doc in all_docs}.values())
    return unique_docs

📦 Tech Stack

  • Vector DB: ChromaDB (local), Pinecone (cloud)
  • Embeddings: OpenAI, Sentence Transformers
  • LLM: GPT-3.5/4 via LangChain
  • Evaluation: RAGAS framework
  • Backend: FastAPI
  • Document Loading: PyPDF, python-docx

🔗 Try It Yourself

# Install
pip install -r requirements.txt

# Ingest documents
python src/ingest.py --path ./data/documents

# Query
python src/query.py "What is RAG?"

# Run API
python src/main.py

📚 Resources


Next in Series: Knowledge Graph Generator - Extract entities and relationships to build knowledge graphs.