Get the FREE Ultimate OpenClaw Setup Guide →

dspy-rag-pipeline

Use Caution
npx machina-cli add skill OmidZamani/dspy-skills/dspy-rag-pipeline --openclaw
Files (1)
SKILL.md
7.5 KB

DSPy RAG Pipeline

Goal

Build retrieval-augmented generation pipelines with ColBERTv2 that can be systematically optimized.

When to Use

  • Questions require external knowledge
  • You have a document corpus to search
  • Need grounded, factual responses
  • Want to optimize retrieval + generation jointly

Related Skills

Inputs

InputTypeDescription
questionstrUser query
kintNumber of passages to retrieve
rmdspy.RetrieveRetrieval model (ColBERTv2)

Outputs

OutputTypeDescription
contextlist[str]Retrieved passages
answerstrGenerated response

Workflow

Phase 1: Configure Retrieval

import dspy

# Configure LM and retriever
colbert = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')
dspy.configure(
    lm=dspy.LM("openai/gpt-4o-mini"),
    rm=colbert
)

Phase 2: Define Signature

class GenerateAnswer(dspy.Signature):
    """Answer questions with short factoid answers."""
    context: str = dspy.InputField(desc="May contain relevant facts")
    question: str = dspy.InputField()
    answer: str = dspy.OutputField(desc="Often between 1 and 5 words")

Phase 3: Build RAG Module

class RAG(dspy.Module):
    def __init__(self, num_passages=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=num_passages)
        self.generate = dspy.ChainOfThought(GenerateAnswer)
    
    def forward(self, question):
        context = self.retrieve(question).passages
        pred = self.generate(context=context, question=question)
        return dspy.Prediction(context=context, answer=pred.answer)

Phase 4: Use

rag = RAG(num_passages=3)
result = rag(question="What is the capital of France?")
print(result.answer)  # Paris

Production Example

import dspy
from dspy.teleprompt import BootstrapFewShot
from dspy.evaluate import Evaluate
import logging

logger = logging.getLogger(__name__)

class GenerateAnswer(dspy.Signature):
    """Answer questions using the provided context."""
    context: list[str] = dspy.InputField(desc="Retrieved passages")
    question: str = dspy.InputField()
    answer: str = dspy.OutputField(desc="Concise factual answer")

class ProductionRAG(dspy.Module):
    def __init__(self, num_passages=5):
        super().__init__()
        self.num_passages = num_passages
        self.retrieve = dspy.Retrieve(k=num_passages)
        self.generate = dspy.ChainOfThought(GenerateAnswer)
    
    def forward(self, question: str):
        try:
            # Retrieve
            retrieval_result = self.retrieve(question)
            context = retrieval_result.passages
            
            if not context:
                logger.warning(f"No passages retrieved for: {question}")
                return dspy.Prediction(
                    context=[],
                    answer="I couldn't find relevant information."
                )
            
            # Generate
            pred = self.generate(context=context, question=question)
            
            return dspy.Prediction(
                context=context,
                answer=pred.answer,
                reasoning=getattr(pred, 'reasoning', None)
            )
            
        except Exception as e:
            logger.error(f"RAG failed: {e}")
            return dspy.Prediction(
                context=[],
                answer="An error occurred while processing your question."
            )

def validate_answer(example, pred, trace=None):
    """Check if answer is grounded and correct."""
    if not pred.answer or not pred.context:
        return 0.0
    
    # Check correctness
    correct = example.answer.lower() in pred.answer.lower()
    
    # Check grounding (answer should relate to context)
    context_text = " ".join(pred.context).lower()
    grounded = any(word in context_text for word in pred.answer.lower().split())
    
    return float(correct and grounded)

def build_optimized_rag(trainset, devset):
    """Build and optimize a RAG pipeline."""
    
    # Configure
    colbert = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')
    dspy.configure(
        lm=dspy.LM("openai/gpt-4o-mini"),
        rm=colbert
    )
    
    # Build
    rag = ProductionRAG(num_passages=5)
    
    # Evaluate baseline
    evaluator = Evaluate(devset=devset, metric=validate_answer, num_threads=8)
    baseline = evaluator(rag)
    logger.info(f"Baseline: {baseline:.2%}")
    
    # Optimize
    optimizer = BootstrapFewShot(
        metric=validate_answer,
        max_bootstrapped_demos=4,
        max_labeled_demos=4
    )
    compiled = optimizer.compile(rag, trainset=trainset)
    
    optimized = evaluator(compiled)
    logger.info(f"Optimized: {optimized:.2%}")
    
    compiled.save("rag_optimized.json")
    return compiled

Multi-Hop RAG

class MultiHopRAG(dspy.Module):
    """RAG with iterative retrieval for complex questions."""
    
    def __init__(self, num_hops=2, passages_per_hop=3):
        super().__init__()
        self.num_hops = num_hops
        self.retrieve = dspy.Retrieve(k=passages_per_hop)
        self.generate_query = dspy.ChainOfThought("context, question -> search_query")
        self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
    
    def forward(self, question):
        context = []
        
        for hop in range(self.num_hops):
            # First hop: use original question
            # Later hops: generate refined query
            if hop == 0:
                query = question
            else:
                query = self.generate_query(
                    context=context,
                    question=question
                ).search_query
            
            # Retrieve and accumulate
            new_passages = self.retrieve(query).passages
            context.extend(new_passages)
        
        # Generate final answer
        pred = self.generate_answer(context=context, question=question)
        return dspy.Prediction(context=context, answer=pred.answer)

Best Practices

  1. Tune k carefully - More passages = more context but also noise
  2. Signature descriptions matter - Guide the model with field descriptions
  3. Validate grounding - Ensure answers come from retrieved context
  4. Consider multi-hop - Complex questions may need iterative retrieval

Limitations

  • Retrieval quality bounds generation quality
  • ColBERTv2 requires hosted index
  • Context length limits affect passage count
  • Latency increases with more passages

Official Documentation

Source

git clone https://github.com/OmidZamani/dspy-skills/blob/master/skills/dspy-rag-pipeline/SKILL.mdView on GitHub

Overview

This skill helps you build retrieval-augmented generation pipelines using ColBERTv2 in DSPy. It covers wiring a retriever, defining a generation signature, and assembling a RAG module to fetch external passages and generate grounded answers.

How This Skill Works

Configure LM and ColBERTv2 retriever, then define a DSPy GenerateAnswer signature. Build a RAG module that retrieves k passages and feeds them to the generator to produce a grounded answer. The workflow returns both the retrieved context and the answer.

When to Use It

  • When external knowledge is required to answer questions.
  • When you have a document corpus to search and ground responses in retrieved passages.
  • When you need grounded, factual responses rather than purely generative text.
  • When you want to optimize retrieval and generation jointly for better end-to-end quality.
  • When multi-hop context retrieval is needed to answer complex queries.

Quick Start

  1. Step 1: Configure LM and ColBERTv2 retriever (e.g., set up ColBERTv2 url) using dspy.configure.
  2. Step 2: Define a GenerateAnswer signature with fields: context, question, and answer.
  3. Step 3: Build a RAG class that retrieves passages and generates an answer, then run rag = RAG(num_passages=3); result = rag(question="Your question?").

Best Practices

  • Tune k to balance retrieval coverage with latency and cost.
  • Ensure retrieved passages are relevant before feeding them to the generator.
  • Handle empty retrieval gracefully with a clear fallback message.
  • Keep generated answers concise and clearly tied to the retrieved context.
  • Periodically evaluate retrieval quality and retriever configuration (e.g., ColBERTv2 URL).

Example Use Cases

  • Answering customer support questions using a product manual repository.
  • Academic Q&A over lecture notes and research papers.
  • Technical documentation lookup for developer questions.
  • Grounding answers to a database of policy or regulatory documents.
  • Multi-hop grounding over a collection of clinical guidelines.

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers