May-2-2025/enhanced_combined.py

341 lines
12 KiB
Python

import os
import pickle
import json
import nltk
import requests
import time
from bs4 import BeautifulSoup
from urllib.parse import quote
from langchain_community.document_loaders import PDFPlumberLoader, WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.retrievers import BM25Retriever
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
class ModularRAG:
def __init__(self):
self.storage_path = "./rag_data"
if not os.path.exists(self.storage_path):
os.makedirs(self.storage_path)
os.makedirs(os.path.join(self.storage_path, "documents"))
os.makedirs(os.path.join(self.storage_path, "web_results"))
self.documents = []
self.web_results = []
# Web crawler settings
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
self.num_search_results = 10
self.max_depth = 2
self.max_links_per_page = 5
self.max_paragraphs = 5
self._load_saved_data()
def _load_saved_data(self):
doc_path = os.path.join(self.storage_path, "documents", "docs.pkl")
web_path = os.path.join(self.storage_path, "web_results", "web.json")
if os.path.exists(doc_path):
try:
with open(doc_path, 'rb') as f:
self.documents = pickle.load(f)
except Exception as e:
print(f"خطا در بارگیری اسناد: {e}")
if os.path.exists(web_path):
try:
with open(web_path, 'r', encoding='utf-8') as f:
self.web_results = json.load(f)
except Exception as e:
print(f"خطا در بارگیری نتایج وب: {e}")
def _save_documents(self):
doc_path = os.path.join(self.storage_path, "documents", "docs.pkl")
try:
with open(doc_path, 'wb') as f:
pickle.dump(self.documents, f)
except Exception as e:
print(f"خطا در ذخیره‌سازی اسناد: {e}")
def _save_web_results(self):
web_path = os.path.join(self.storage_path, "web_results", "web.json")
try:
with open(web_path, 'w', encoding='utf-8') as f:
json.dump(self.web_results, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"خطا در ذخیره‌سازی نتایج وب: {e}")
def load_pdf(self, file_path):
if not os.path.exists(file_path):
raise FileNotFoundError(f"فایل یافت نشد: {file_path}")
try:
loader = PDFPlumberLoader(file_path)
documents = loader.load()
if documents:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
add_start_index=True
)
chunked_docs = text_splitter.split_documents(documents)
self.documents.extend(chunked_docs)
self._save_documents()
return len(chunked_docs)
return 0
except Exception as e:
raise Exception(f"خطا در بارگیری PDF: {e}")
def search_duckduckgo(self, query, num_results=None):
if num_results is None:
num_results = self.num_search_results
try:
search_url = f"https://html.duckduckgo.com/html/?q={quote(query)}"
response = requests.get(search_url, headers=self.headers, timeout=10)
if response.status_code != 200:
print(f"خطا در جستجوی وب: HTTP {response.status_code}")
return []
soup = BeautifulSoup(response.text, 'html.parser')
results = []
for element in soup.select('.result__url, .result__a'):
href = element.get('href') if 'href' in element.attrs else None
if href and not href.startswith('/') and (href.startswith('http://') or href.startswith('https://')):
results.append(href)
elif not href and element.find('a') and 'href' in element.find('a').attrs:
href = element.find('a')['href']
if href and not href.startswith('/'):
results.append(href)
unique_results = list(set(results))
return unique_results[:num_results]
except Exception as e:
print(f"خطا در جستجوی DuckDuckGo: {e}")
return []
def crawl_page(self, url, depth=0):
if depth > self.max_depth:
return None, []
try:
response = requests.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else "بدون عنوان"
paragraphs = []
for p in soup.find_all('p'):
text = p.get_text(strip=True)
if len(text) > 50:
paragraphs.append(text)
if len(paragraphs) >= self.max_paragraphs:
break
links = []
for a in soup.find_all('a', href=True):
href = a['href']
if href.startswith('http') and href != url:
links.append(href)
if len(links) >= self.max_links_per_page:
break
content = {
"url": url,
"title": title,
"paragraphs": paragraphs
}
return content, links
except Exception as e:
print(f"خطا در خزش صفحه {url}: {e}")
return None, []
def crawl_website(self, start_url, max_pages=10):
visited = set()
to_visit = [start_url]
contents = []
while to_visit and len(visited) < max_pages:
current_url = to_visit.pop(0)
if current_url in visited:
continue
content, links = self.crawl_page(current_url)
visited.add(current_url)
if content and content["paragraphs"]:
contents.append(content)
for link in links:
if link not in visited and link not in to_visit:
to_visit.append(link)
time.sleep(1)
return contents
def crawl_web(self, query):
urls = self.search_duckduckgo(query)
if not urls:
print("هیچ نتیجه‌ای یافت نشد.")
return []
all_results = []
for url in urls[:3]: # Limit to first 3 URLs for efficiency
content, links = self.crawl_page(url)
if content and content["paragraphs"]:
all_results.append(content)
# Follow links from the main page (recursive crawling)
for link in links[:2]: # Limit to first 2 links
sub_content, _ = self.crawl_page(link, depth=1)
if sub_content and sub_content["paragraphs"]:
all_results.append(sub_content)
time.sleep(1)
time.sleep(1)
self.web_results = all_results
self._save_web_results()
# Convert web results to documents for RAG
web_docs = []
for result in all_results:
text = f"[{result['title']}]\n" + "\n".join(result['paragraphs'])
web_docs.append({"page_content": text, "metadata": {"source": result['url']}})
return all_results, web_docs
def build_retriever(self, documents):
if not documents:
return None
# Create BM25 retriever
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 3 # Return top 3 results
return bm25_retriever
def get_relevant_documents(self, query, documents):
retriever = self.build_retriever(documents)
if not retriever:
return []
return retriever.get_relevant_documents(query)
def extract_context_from_documents(self, query):
if not self.documents:
return None
relevant_docs = self.get_relevant_documents(query, self.documents)
if not relevant_docs:
return None
context = "\n\n".join([doc.page_content for doc in relevant_docs])
return context
def extract_context_from_web(self, web_results, web_docs, query):
if not web_results or not web_docs:
return None, []
# Try to use the retriever for better results
if web_docs:
relevant_docs = self.get_relevant_documents(query, web_docs)
if relevant_docs:
context = "\n\n".join([doc.page_content for doc in relevant_docs])
sources = [doc.metadata.get("source", "") for doc in relevant_docs if "source" in doc.metadata]
return context, sources
# Fall back to simple extraction if retriever fails
contexts = []
sources = []
for doc in web_results:
context_text = "\n".join(doc["paragraphs"])
contexts.append(f"[{doc['title']}] {context_text}")
sources.append(doc['url'])
context = "\n\n".join(contexts)
return context, sources
def get_context(query, crawl_params=None):
"""
سیستم RAG مدولار برای پاسخگویی به سوالات با استفاده از اسناد و جستجوی وب
پارامترها:
query (str): سوال به زبان فارسی
crawl_params (dict, optional): پارامترهای خزش وب
- max_depth: حداکثر عمق خزش
- max_links_per_page: حداکثر تعداد لینک‌های استخراج شده از هر صفحه
- max_paragraphs: حداکثر تعداد پاراگراف‌های استخراج شده از هر صفحه
- num_search_results: تعداد نتایج جستجو
خروجی:
dict: نتیجه جستجو شامل متن و منابع
"""
rag = ModularRAG()
# Configure crawling parameters if provided
if crawl_params:
if 'max_depth' in crawl_params:
rag.max_depth = crawl_params['max_depth']
if 'max_links_per_page' in crawl_params:
rag.max_links_per_page = crawl_params['max_links_per_page']
if 'max_paragraphs' in crawl_params:
rag.max_paragraphs = crawl_params['max_paragraphs']
if 'num_search_results' in crawl_params:
rag.num_search_results = crawl_params['num_search_results']
# First try to get context from documents
doc_context = rag.extract_context_from_documents(query)
if doc_context:
return {
"has_context": True,
"context": doc_context,
"source": "documents",
"language": "fa"
}
# Fall back to web search
web_results, web_docs = rag.crawl_web(query)
if web_results:
web_context, sources = rag.extract_context_from_web(web_results, web_docs, query)
return {
"has_context": True,
"context": web_context,
"source": "web",
"sources": sources,
"language": "fa"
}
# No context found
return {
"has_context": False,
"context": "متأسفانه اطلاعاتی در مورد سوال شما یافت نشد.",
"source": "none",
"language": "fa"
}