# State-of-the-Art RAG Implementation

Features:
- Hybrid retrieval (BM25 + vector search)
- Multi-stage retrieval with reranking
- Advanced chunking strategies
- Multi-document support
- Metadata filtering
- Contextual compression
- Web search integration

In [None]:
# Import required libraries
import os
import re
import numpy as np
from typing import List, Dict, Any, Union
import requests
import httpx

# LangChain imports
from langchain_community.document_loaders import TextLoader, PyPDFLoader, DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever, ContextualCompressionRetriever
from langchain_community.retrievers.document_compressors import DocumentCompressorPipeline
from langchain_ollama import OllamaEmbeddings, ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.documents import Document

In [None]:
# Configuration
MODEL_NAME = "gemma3:12b"
DOCS_DIR = "documents"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
VECTOR_DB_PATH = "chroma_db"

# Create documents directory if it doesn't exist
os.makedirs(DOCS_DIR, exist_ok=True)

## Document Loading and Processing

In [None]:
class DocumentProcessor:
 """Handles document loading, chunking, and embedding."""
 
 def __init__(self, docs_dir=DOCS_DIR):
 self.docs_dir = docs_dir
 self.embeddings = OllamaEmbeddings(model=MODEL_NAME)
 self.text_splitter = RecursiveCharacterTextSplitter(
 chunk_size=CHUNK_SIZE,
 chunk_overlap=CHUNK_OVERLAP,
 add_start_index=True
 )
 
 def load_single_document(self, file_path):
 """Load a document based on its file extension."""
 if file_path.endswith('.pdf'):
 loader = PyPDFLoader(file_path)
 elif file_path.endswith(('.txt', '.md', '.html')):
 loader = TextLoader(file_path)
 else:
 raise ValueError(f"Unsupported file type: {file_path}")
 return loader.load()
 
 def load_documents(self):
 """Load all documents from the documents directory."""
 documents = []
 for filename in os.listdir(self.docs_dir):
 file_path = os.path.join(self.docs_dir, filename)
 if os.path.isfile(file_path):
 try:
 docs = self.load_single_document(file_path)
 for doc in docs:
 doc.metadata['source'] = filename
 documents.extend(docs)
 except Exception as e:
 print(f"Error loading {file_path}: {e}")
 return documents
 
 def process_documents(self):
 """Load and chunk documents."""
 documents = self.load_documents()
 if not documents:
 print("No documents found. Please add documents to the 'documents' directory.")
 return []
 return self.text_splitter.split_documents(documents)
 
 def create_document_from_text(self, text, metadata=None):
 """Create a document from text content."""
 metadata = metadata or {}
 doc = Document(page_content=text, metadata=metadata)
 return self.text_splitter.split_documents([doc])
 
 def add_document(self, file_path):
 """Add a new document to the documents directory."""
 if not os.path.exists(file_path):
 raise FileNotFoundError(f"File not found: {file_path}")
 
 filename = os.path.basename(file_path)
 destination = os.path.join(self.docs_dir, filename)
 
 # Copy file to documents directory
 with open(file_path, 'rb') as src, open(destination, 'wb') as dst:
 dst.write(src.read())
 
 return self.load_single_document(destination)

## Web Search Integration

In [None]:
class WebSearchTool:
 """Handles web search integration using DuckDuckGo."""
 
 def __init__(self, processor):
 self.processor = processor
 
 def search(self, query, num_results=3):
 """Search the web for information and convert results to documents."""
 try:
 # Use DuckDuckGo API (via a public proxy)
 response = httpx.get(
 "https://api.duckduckgo.com/",
 params={
 "q": query,
 "format": "json",
 "no_html": 1,
 "no_redirect": 1
 },
 timeout=10.0
 )
 
 if response.status_code != 200:
 print(f"Error searching the web: {response.status_code}")
 return []
 
 results = response.json()
 if not results.get('AbstractText') and not results.get('RelatedTopics'):
 # Fallback to a simpler HTTP request to ddg-api
 response = httpx.get(
 "https://ddg-api.herokuapp.com/search",
 params={"query": query, "limit": num_results},
 timeout=10.0
 )
 
 if response.status_code != 200:
 print(f"Error with fallback search: {response.status_code}")
 return []
 
 results = response.json()
 web_results = []
 
 for result in results[:num_results]:
 title = result.get('title', '')
 snippet = result.get('snippet', '')
 url = result.get('link', '')
 content = f"Title: {title}\nURL: {url}\nContent: {snippet}"
 web_results.append(content)
 else:
 # Process DuckDuckGo API results
 web_results = []
 if results.get('AbstractText'):
 web_results.append(f"Abstract: {results['AbstractText']}\nSource: {results.get('AbstractSource', '')}")
 
 for topic in results.get('RelatedTopics', [])[:num_results-len(web_results)]:
 if 'Text' in topic:
 web_results.append(topic['Text'])
 
 # Convert to documents
 documents = []
 for i, result in enumerate(web_results):
 chunks = self.processor.create_document_from_text(
 result,
 metadata={"source": f"web_search_{i}", "query": query}
 )
 documents.extend(chunks)
 
 return documents
 except Exception as e:
 print(f"Error during web search: {str(e)}")
 return []

## Advanced Retrieval System

In [None]:
class AdvancedRetriever:
 """Manages the hybrid retrieval system combining multiple techniques."""
 
 def __init__(self, processor, web_search=None):
 self.processor = processor
 self.web_search = web_search
 self.vector_store = None
 self.retriever = None
 
 def build_retriever(self, documents=None):
 """Build a hybrid retriever incorporating multiple retrieval methods."""
 if documents is None:
 documents = self.processor.process_documents()
 
 if not documents:
 print("No documents to build retriever from.")
 return None
 
 # Create the vector store
 self.vector_store = Chroma.from_documents(
 documents=documents,
 embedding=self.processor.embeddings,
 persist_directory=VECTOR_DB_PATH
 )
 vector_retriever = self.vector_store.as_retriever(search_kwargs={"k": 4})
 
 # Create BM25 retriever
 bm25_retriever = BM25Retriever.from_documents(documents)
 bm25_retriever.k = 4
 
 # Combine retrievers
 self.retriever = EnsembleRetriever(
 retrievers=[vector_retriever, bm25_retriever],
 weights=[0.7, 0.3]
 )
 
 return self.retriever
 
 def search(self, query, use_web=True, k=5):
 """Perform a search using the retriever and optionally web search."""
 if self.retriever is None:
 self.build_retriever()
 
 if self.retriever is None:
 # If build_retriever failed
 if use_web and self.web_search:
 return self.web_search.search(query, num_results=k)
 return []
 
 # Get results from document retriever
 results = self.retriever.get_relevant_documents(query)
 
 # Optionally add web search results
 if use_web and self.web_search:
 web_results = self.web_search.search(query)
 if web_results:
 # Combine results, prioritizing local documents
 combined_results = results + web_results
 # Deduplicate by content
 seen_content = set()
 unique_results = []
 for doc in combined_results:
 if doc.page_content not in seen_content:
 seen_content.add(doc.page_content)
 unique_results.append(doc)
 return unique_results[:k]
 
 return results[:k]

## RAG Question Answering

In [None]:
class RAGSystem:
 """Main RAG system that integrates all components."""
 
 def __init__(self):
 self.processor = DocumentProcessor()
 self.web_search = WebSearchTool(self.processor)
 self.retriever = AdvancedRetriever(self.processor, self.web_search)
 self.llm = ChatOllama(model=MODEL_NAME, temperature=0.1)
 
 # Create a sample document if the documents directory is empty
 if not os.listdir(DOCS_DIR):
 sample_path = os.path.join(DOCS_DIR, "sample.txt")
 with open(sample_path, "w") as f:
 f.write("This is a sample document for testing the RAG system.\n")
 f.write("The system combines vector search, BM25, and web search capabilities.\n")
 f.write("You can add your own documents to the 'documents' directory.\n")
 
 def initialize(self):
 """Initialize the RAG system."""
 documents = self.processor.process_documents()
 self.retriever.build_retriever(documents)
 return self
 
 def answer(self, query, use_web=True):
 """Generate an answer for the query using retrieved context."""
 # Get relevant documents
 docs = self.retriever.search(query, use_web=use_web)
 
 if not docs:
 return "I couldn't find any relevant information to answer your question."
 
 # Create context from documents
 context = "\n\n".join([f"Document {i+1}:\n{doc.page_content}" for i, doc in enumerate(docs)])
 
 # Generate answer
 prompt = ChatPromptTemplate.from_template("""
 Answer the following question based on the provided context.
 If the answer is not in the context, say "I don't have enough information to answer this question."
 
 Context:
 {context}
 
 Question: {query}
 
 Answer:
 """)
 
 chain = prompt | self.llm
 response = chain.invoke({"context": context, "query": query})
 
 return response.content
 
 def add_document(self, file_path):
 """Add a new document and update the retriever."""
 documents = self.processor.add_document(file_path)
 chunks = self.processor.text_splitter.split_documents(documents)
 
 # Update existing vector store
 if self.retriever.vector_store is not None:
 self.retriever.vector_store.add_documents(chunks)
 
 # Rebuild retriever
 self.retriever.build_retriever()
 
 return len(chunks)

## Usage Example

In [None]:
# Initialize the RAG system
rag_system = RAGSystem().initialize()

# Test with a sample query
query = "What is a hybrid RAG system?"
answer = rag_system.answer(query)
print(f"Query: {query}")
print(f"Answer: {answer}")

In [None]:
# Test with web search
query = "What are the latest developments in large language models?"
answer = rag_system.answer(query, use_web=True)
print(f"Query: {query}")
print(f"Answer: {answer}")

## Adding Your Own Documents

In [None]:
# Example: Add your own document
# Replace with the path to your document
# document_path = "/path/to/your/document.pdf"
# num_chunks = rag_system.add_document(document_path)
# print(f"Added document with {num_chunks} chunks")