From 6b0442c791007b0102a862c494f6d7aa182cc8f9 Mon Sep 17 00:00:00 2001 From: MasihMoafi Date: Fri, 2 May 2025 11:02:36 +0000 Subject: [PATCH] Update enhanced_combined.py --- enhanced_combined.py | 744 ++++++++++++++++++------------------------- 1 file changed, 312 insertions(+), 432 deletions(-) diff --git a/enhanced_combined.py b/enhanced_combined.py index 77731c7..fd82cfa 100644 --- a/enhanced_combined.py +++ b/enhanced_combined.py @@ -1,460 +1,340 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - import os -import re +import pickle import json -import ssl -import argparse +import nltk import requests +import time from bs4 import BeautifulSoup from urllib.parse import quote -from langchain_community.embeddings import HuggingFaceEmbeddings -from langchain_community.vectorstores import Chroma -from langchain_core.documents import Document -import traceback +from langchain_community.document_loaders import PDFPlumberLoader, WebBaseLoader +from langchain_text_splitters import RecursiveCharacterTextSplitter +from langchain_community.retrievers import BM25Retriever -# Disable SSL warnings and proxy settings -ssl._create_default_https_context = ssl._create_unverified_context -requests.packages.urllib3.disable_warnings() +try: + nltk.data.find('tokenizers/punkt') +except LookupError: + nltk.download('punkt') -def clear_proxy_settings(): - """Remove proxy environment variables that might cause connection issues.""" - for var in ["HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"]: - if var in os.environ: - print(f"Removing proxy env var: {var}") - del os.environ[var] - -# Run at module load time -clear_proxy_settings() - -# Configuration -DOCUMENT_PATHS = [ - r'doc1.txt', - r'doc2.txt', - r'doc3.txt', - r'doc4.txt', - r'doc5.txt', - r'doc6.txt' -] -EMBEDDING_MODEL = 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2' -LLM_MODEL = 'gemma3' -CHUNK_SIZE = 1000 -OVERLAP = 200 -CHROMA_PERSIST_DIR = 'chroma_db' - -# Confidence thresholds -THRESHOLDS = { - 'direct_answer': 0.7, - 'rag_confidence': 0.6, - 'web_search': 0.5 -} - -def query_llm(prompt, model='gemma3'): - """Query the LLM model directly using Ollama API.""" - try: - ollama_endpoint = "http://localhost:11434/api/generate" - payload = { - "model": model, - "prompt": prompt, - "stream": False - } - response = requests.post(ollama_endpoint, json=payload) - - if response.status_code == 200: - result = response.json() - return result.get('response', '') - else: - print(f"Ollama API error: {response.status_code}") - return f"Error calling Ollama API: {response.status_code}" - except Exception as e: - print(f"Error querying LLM: {e}") - return f"Error: {str(e)}" - -class BM25Retriever: - """BM25 retriever implementation for text similarity search""" - - @classmethod - def from_documents(cls, documents): - """Create a BM25 retriever from documents""" - retriever = cls() - retriever.documents = documents - retriever.k = 4 - return retriever - - def get_relevant_documents(self, query): - """Get relevant documents using BM25 algorithm""" - # Simple BM25-like implementation - scores = [] - query_terms = set(re.findall(r'\b\w+\b', query.lower())) - - for doc in self.documents: - doc_terms = set(re.findall(r'\b\w+\b', doc.page_content.lower())) - # Calculate term overlap as a simple approximation of BM25 - overlap = len(query_terms.intersection(doc_terms)) - scores.append((doc, overlap)) - - # Sort by score and return top k - sorted_docs = [doc for doc, score in sorted(scores, key=lambda x: x[1], reverse=True)] - return sorted_docs[:self.k] - -class HybridRetriever: - """Hybrid retriever combining BM25 and vector search with configurable weights""" - - def __init__(self, vector_retriever, bm25_retriever, vector_weight=0.3): - """Initialize with separate retrievers and weights""" - self._vector_retriever = vector_retriever - self._bm25_retriever = bm25_retriever - self._vector_weight = vector_weight - self._bm25_weight = 1.0 - vector_weight - - def get_relevant_documents(self, query): - """Get relevant documents using weighted combination of retrievers""" - try: - # Get results from both retrievers - vector_docs = self._vector_retriever.get_relevant_documents(query) - bm25_docs = self._bm25_retriever.get_relevant_documents(query) - - # Create dictionary to track unique documents and their scores - doc_dict = {} - - # Add vector docs with their weights - for i, doc in enumerate(vector_docs): - # Score based on position (inverse rank) - score = (len(vector_docs) - i) * self._vector_weight - doc_id = doc.page_content[:50] # Use first 50 chars as a simple ID - if doc_id in doc_dict: - doc_dict[doc_id]["score"] += score - else: - doc_dict[doc_id] = {"doc": doc, "score": score} - - # Add BM25 docs with their weights - for i, doc in enumerate(bm25_docs): - # Score based on position (inverse rank) - score = (len(bm25_docs) - i) * self._bm25_weight - doc_id = doc.page_content[:50] # Use first 50 chars as a simple ID - if doc_id in doc_dict: - doc_dict[doc_id]["score"] += score - else: - doc_dict[doc_id] = {"doc": doc, "score": score} - - # Sort by combined score (highest first) - sorted_docs = sorted(doc_dict.values(), key=lambda x: x["score"], reverse=True) - - # Return just the document objects - return [item["doc"] for item in sorted_docs] - except Exception as e: - print(f"Error in hybrid retrieval: {e}") - return [] - -class AgenticQASystem: - """QA system implementing the specified architecture""" - +class ModularRAG: def __init__(self): - """Initialize the QA system with retrievers""" - # Load embeddings - self.embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) - # Load documents and retrievers - self.documents = self.load_documents() - self.retriever = self.initialize_retriever() + self.storage_path = "./rag_data" + + if not os.path.exists(self.storage_path): + os.makedirs(self.storage_path) + os.makedirs(os.path.join(self.storage_path, "documents")) + os.makedirs(os.path.join(self.storage_path, "web_results")) + + self.documents = [] + self.web_results = [] + + # Web crawler settings + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + } + self.num_search_results = 10 + self.max_depth = 2 + self.max_links_per_page = 5 + self.max_paragraphs = 5 + + self._load_saved_data() - def load_documents(self): - """Load documents from configured paths with sliding window chunking""" - print("Loading documents...") - docs = [] - for path in DOCUMENT_PATHS: + def _load_saved_data(self): + doc_path = os.path.join(self.storage_path, "documents", "docs.pkl") + web_path = os.path.join(self.storage_path, "web_results", "web.json") + + if os.path.exists(doc_path): try: - with open(path, 'r', encoding='utf-8') as f: - text = re.sub(r'\s+', ' ', f.read()).strip() - # Sliding window chunking - chunks = [text[i:i+CHUNK_SIZE] for i in range(0, len(text), CHUNK_SIZE - OVERLAP)] - for chunk in chunks: - docs.append(Document( - page_content=chunk, - metadata={"source": os.path.basename(path)} - )) + with open(doc_path, 'rb') as f: + self.documents = pickle.load(f) except Exception as e: - print(f"Error loading document {path}: {e}") - print(f"Loaded {len(docs)} document chunks") - return docs + print(f"خطا در بارگیری اسناد: {e}") + + if os.path.exists(web_path): + try: + with open(web_path, 'r', encoding='utf-8') as f: + self.web_results = json.load(f) + except Exception as e: + print(f"خطا در بارگیری نتایج وب: {e}") - def initialize_retriever(self): - """Initialize the hybrid retriever with BM25 and direct Chroma queries""" - if not self.documents: - print("No documents loaded, retriever initialization failed") - return None + def _save_documents(self): + doc_path = os.path.join(self.storage_path, "documents", "docs.pkl") + try: + with open(doc_path, 'wb') as f: + pickle.dump(self.documents, f) + except Exception as e: + print(f"خطا در ذخیره‌سازی اسناد: {e}") + + def _save_web_results(self): + web_path = os.path.join(self.storage_path, "web_results", "web.json") + try: + with open(web_path, 'w', encoding='utf-8') as f: + json.dump(self.web_results, f, ensure_ascii=False, indent=2) + except Exception as e: + print(f"خطا در ذخیره‌سازی نتایج وب: {e}") + + def load_pdf(self, file_path): + if not os.path.exists(file_path): + raise FileNotFoundError(f"فایل یافت نشد: {file_path}") try: - # Create BM25 retriever - bm25_retriever = BM25Retriever.from_documents(self.documents) - bm25_retriever.k = 4 # Top k results to retrieve + loader = PDFPlumberLoader(file_path) + documents = loader.load() - # Initialize vector store with KNN search - import shutil - if os.path.exists(CHROMA_PERSIST_DIR): - print(f"Removing existing Chroma DB to prevent dimension mismatch") - shutil.rmtree(CHROMA_PERSIST_DIR) + if documents: + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=1000, + chunk_overlap=200, + add_start_index=True + ) + chunked_docs = text_splitter.split_documents(documents) - # Create vector store directly from Chroma - print("Creating vector store...") - vector_store = Chroma.from_documents( - documents=self.documents, - embedding=self.embeddings, - persist_directory=CHROMA_PERSIST_DIR - ) - - vector_retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 4}) - print(f"Vector retriever created: {type(vector_retriever)}") - - # Create hybrid retriever - BM25 (70%) and Vector (30%) - print("Creating hybrid retriever") - hybrid_retriever = HybridRetriever(vector_retriever, bm25_retriever, vector_weight=0.3) - print("Hybrid retriever initialized successfully") - return hybrid_retriever - + self.documents.extend(chunked_docs) + self._save_documents() + return len(chunked_docs) + return 0 except Exception as e: - print(f"Error initializing retriever: {e}") - traceback.print_exc() - return None + raise Exception(f"خطا در بارگیری PDF: {e}") - def estimate_confidence(self, text, query, context=None): - """Estimate confidence of response""" - # Start with baseline confidence - confidence = 0.5 - - # Check for uncertainty markers - uncertainty_phrases = [ - "نمی‌دانم", "مطمئن نیستم", "ممکن است", "شاید", "احتمالاً", - "فکر می‌کنم", "به نظر می‌رسد" - ] - - if any(phrase in text.lower() for phrase in uncertainty_phrases): - confidence -= 0.2 - - # Check for question relevance - query_words = set(re.findall(r'\b\w+\b', query.lower())) - text_words = set(re.findall(r'\b\w+\b', text.lower())) - - # Calculate overlap between query and response - if query_words: - overlap_ratio = len(query_words.intersection(text_words)) / len(query_words) - if overlap_ratio > 0.5: - confidence += 0.2 - elif overlap_ratio < 0.2: - confidence -= 0.2 - - # If context provided, check context relevance - if context: - context_words = set(re.findall(r'\b\w+\b', context.lower())) - if context_words: - context_overlap = len(context_words.intersection(text_words)) / len(context_words) - if context_overlap > 0.3: - confidence += 0.2 - else: - confidence -= 0.1 - - # Ensure confidence is within [0,1] - return max(0.0, min(1.0, confidence)) - - def check_direct_knowledge(self, query): - """Check if the LLM can answer directly from its knowledge""" - print("Checking LLM's direct knowledge...") - prompt = f"""به این سوال با استفاده از دانش خود پاسخ دهید. فقط به زبان فارسی پاسخ دهید. - -سوال: {query} - -پاسخ فارسی:""" - - response = query_llm(prompt, model=LLM_MODEL) - confidence = self.estimate_confidence(response, query) - print(f"LLM direct knowledge confidence: {confidence:.2f}") - - return response, confidence - - def rag_query(self, query): - """Use RAG to retrieve and generate answer""" - if not self.retriever: - print("Retriever not initialized, skipping RAG") - return None, 0.0 - - print("Retrieving documents for RAG...") - # Retrieve relevant documents - docs = self.retriever.get_relevant_documents(query) - - if not docs: - print("No relevant documents found") - return None, 0.0 - - print(f"Retrieved {len(docs)} relevant documents") - - # Prepare context - context = "\n\n".join([doc.page_content for doc in docs]) - sources = [doc.metadata.get("source", "Unknown") for doc in docs] - - # Query LLM with context - prompt = f"""با توجه به اطلاعات زیر، به سوال پاسخ دهید. فقط به زبان فارسی پاسخ دهید. - -اطلاعات: -{context} - -سوال: {query} - -پاسخ فارسی:""" - - response = query_llm(prompt, model=LLM_MODEL) - confidence = self.estimate_confidence(response, query, context) - print(f"RAG confidence: {confidence:.2f}") - - return { - "response": response, - "confidence": confidence, - "sources": list(set(sources)) - }, confidence - - def web_search(self, query): - """Search the web for an answer""" - print("Searching web for answer...") - # Search DuckDuckGo - search_url = f"https://html.duckduckgo.com/html/?q={quote(query)}" - response = requests.get(search_url, verify=False, timeout=10) - - if response.status_code != 200: - print(f"Error searching web: HTTP {response.status_code}") - return None, 0.0 - - # Parse results - soup = BeautifulSoup(response.text, 'html.parser') - results = [] - - for element in soup.select('.result__url, .result__a')[:4]: - href = element.get('href') if 'href' in element.attrs else None + def search_duckduckgo(self, query, num_results=None): + if num_results is None: + num_results = self.num_search_results - if href and not href.startswith('/') and (href.startswith('http://') or href.startswith('https://')): - results.append(href) - elif not href and element.find('a') and 'href' in element.find('a').attrs: - href = element.find('a')['href'] - if href and not href.startswith('/'): + try: + search_url = f"https://html.duckduckgo.com/html/?q={quote(query)}" + response = requests.get(search_url, headers=self.headers, timeout=10) + + if response.status_code != 200: + print(f"خطا در جستجوی وب: HTTP {response.status_code}") + return [] + + soup = BeautifulSoup(response.text, 'html.parser') + results = [] + + for element in soup.select('.result__url, .result__a'): + href = element.get('href') if 'href' in element.attrs else None + + if href and not href.startswith('/') and (href.startswith('http://') or href.startswith('https://')): results.append(href) + elif not href and element.find('a') and 'href' in element.find('a').attrs: + href = element.find('a')['href'] + if href and not href.startswith('/'): + results.append(href) + + unique_results = list(set(results)) + return unique_results[:num_results] - if not results: - print("No web results found") - return None, 0.0 - - # Crawl top results - web_content = [] - for url in results[:3]: - try: - headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} - page = requests.get(url, headers=headers, timeout=10, verify=False) - page.raise_for_status() - - soup = BeautifulSoup(page.text, 'html.parser') - - # Remove non-content elements - for tag in ['script', 'style', 'nav', 'footer', 'header']: - for element in soup.find_all(tag): - element.decompose() - - # Get paragraphs - paragraphs = [p.get_text(strip=True) for p in soup.find_all('p') - if len(p.get_text(strip=True)) > 20] - - if paragraphs: - web_content.append(f"[Source: {url}] " + " ".join(paragraphs[:5])) - except Exception as e: - print(f"Error crawling {url}: {e}") - - if not web_content: - print("No useful content found from web results") - return None, 0.0 - - # Query LLM with web content - context = "\n\n".join(web_content) - prompt = f"""با توجه به اطلاعات زیر که از وب بدست آمده، به سوال پاسخ دهید. فقط به زبان فارسی پاسخ دهید. - -اطلاعات: -{context} - -سوال: {query} - -پاسخ فارسی:""" - - response = query_llm(prompt, model=LLM_MODEL) - confidence = self.estimate_confidence(response, query, context) - print(f"Web search confidence: {confidence:.2f}") - - return { - "response": response, - "confidence": confidence, - "sources": results[:3] - }, confidence + except Exception as e: + print(f"خطا در جستجوی DuckDuckGo: {e}") + return [] - def get_answer(self, query): - """Main method to get an answer following the specified architecture""" - print(f"Processing query: {query}") + def crawl_page(self, url, depth=0): + if depth > self.max_depth: + return None, [] - # STEP 1: Try direct LLM knowledge - direct_response, direct_confidence = self.check_direct_knowledge(query) + try: + response = requests.get(url, headers=self.headers, timeout=10) + response.raise_for_status() + + soup = BeautifulSoup(response.text, 'html.parser') + + title = soup.title.string if soup.title else "بدون عنوان" + + paragraphs = [] + for p in soup.find_all('p'): + text = p.get_text(strip=True) + if len(text) > 50: + paragraphs.append(text) + if len(paragraphs) >= self.max_paragraphs: + break + + links = [] + for a in soup.find_all('a', href=True): + href = a['href'] + if href.startswith('http') and href != url: + links.append(href) + if len(links) >= self.max_links_per_page: + break + + content = { + "url": url, + "title": title, + "paragraphs": paragraphs + } + + return content, links - if direct_confidence >= THRESHOLDS['direct_answer']: - print("Using direct LLM knowledge (high confidence)") - return f"{direct_response}\n\n[Source: LLM Knowledge, Confidence: {direct_confidence:.2f}]" - - # STEP 2: Try RAG with local documents - rag_result, rag_confidence = self.rag_query(query) - - if rag_result and rag_confidence >= THRESHOLDS['rag_confidence']: - print("Using RAG response (sufficient confidence)") - sources_text = ", ".join(rag_result["sources"][:3]) - return f"{rag_result['response']}\n\n[Source: Local Documents, Confidence: {rag_confidence:.2f}, Sources: {sources_text}]" - - # STEP 3: Try web search - web_result, web_confidence = self.web_search(query) - - if web_result and web_confidence >= THRESHOLDS['web_search']: - print("Using web search response (sufficient confidence)") - sources_text = ", ".join(web_result["sources"]) - return f"{web_result['response']}\n\n[Source: Web Search, Confidence: {web_confidence:.2f}, Sources: {sources_text}]" - - # STEP 4: Fall back to direct response with warning - print("No high-confidence source found, using direct response with warning") - return f"{direct_response}\n\n[Warning: Low confidence ({direct_confidence:.2f}). Please verify information.]" - -# Simple API functions -def get_answer(query): - """Get an answer for a query""" - system = AgenticQASystem() - return system.get_answer(query) - -# Main entry point -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="QA System") + except Exception as e: + print(f"خطا در خزش صفحه {url}: {e}") + return None, [] - mode_group = parser.add_mutually_exclusive_group(required=True) - mode_group.add_argument("--query", "-q", help="Query to answer") - mode_group.add_argument("--interactive", "-i", action="store_true", help="Run in interactive chat mode") - mode_group.add_argument("--test", "-t", action="store_true", help="Run tests") - - args = parser.parse_args() - - if args.interactive: - # Simple interactive mode without memory - qa_system = AgenticQASystem() - print("=== QA System ===") - print("Type 'exit' or 'quit' to end") + def crawl_website(self, start_url, max_pages=10): + visited = set() + to_visit = [start_url] + contents = [] - while True: - user_input = input("\nYou: ") - if not user_input.strip(): + while to_visit and len(visited) < max_pages: + current_url = to_visit.pop(0) + + if current_url in visited: continue + + content, links = self.crawl_page(current_url) + + visited.add(current_url) + + if content and content["paragraphs"]: + contents.append(content) + + for link in links: + if link not in visited and link not in to_visit: + to_visit.append(link) + + time.sleep(1) + + return contents + + def crawl_web(self, query): + urls = self.search_duckduckgo(query) + + if not urls: + print("هیچ نتیجه‌ای یافت نشد.") + return [] + + all_results = [] + for url in urls[:3]: # Limit to first 3 URLs for efficiency + content, links = self.crawl_page(url) + if content and content["paragraphs"]: + all_results.append(content) - if user_input.lower() in ['exit', 'quit', 'خروج']: - break - - response = qa_system.get_answer(user_input) - print(f"\nBot: {response}") - elif args.query: - qa_system = AgenticQASystem() - print(qa_system.get_answer(args.query)) - elif args.test: - print("Running tests...") \ No newline at end of file + # Follow links from the main page (recursive crawling) + for link in links[:2]: # Limit to first 2 links + sub_content, _ = self.crawl_page(link, depth=1) + if sub_content and sub_content["paragraphs"]: + all_results.append(sub_content) + time.sleep(1) + + time.sleep(1) + + self.web_results = all_results + self._save_web_results() + + # Convert web results to documents for RAG + web_docs = [] + for result in all_results: + text = f"[{result['title']}]\n" + "\n".join(result['paragraphs']) + web_docs.append({"page_content": text, "metadata": {"source": result['url']}}) + + return all_results, web_docs + + def build_retriever(self, documents): + if not documents: + return None + + # Create BM25 retriever + bm25_retriever = BM25Retriever.from_documents(documents) + bm25_retriever.k = 3 # Return top 3 results + + return bm25_retriever + + def get_relevant_documents(self, query, documents): + retriever = self.build_retriever(documents) + if not retriever: + return [] + + return retriever.get_relevant_documents(query) + + def extract_context_from_documents(self, query): + if not self.documents: + return None + + relevant_docs = self.get_relevant_documents(query, self.documents) + + if not relevant_docs: + return None + + context = "\n\n".join([doc.page_content for doc in relevant_docs]) + return context + + def extract_context_from_web(self, web_results, web_docs, query): + if not web_results or not web_docs: + return None, [] + + # Try to use the retriever for better results + if web_docs: + relevant_docs = self.get_relevant_documents(query, web_docs) + if relevant_docs: + context = "\n\n".join([doc.page_content for doc in relevant_docs]) + sources = [doc.metadata.get("source", "") for doc in relevant_docs if "source" in doc.metadata] + return context, sources + + # Fall back to simple extraction if retriever fails + contexts = [] + sources = [] + + for doc in web_results: + context_text = "\n".join(doc["paragraphs"]) + contexts.append(f"[{doc['title']}] {context_text}") + sources.append(doc['url']) + + context = "\n\n".join(contexts) + return context, sources + +def get_context(query, crawl_params=None): + """ + سیستم RAG مدولار برای پاسخگویی به سوالات با استفاده از اسناد و جستجوی وب + + پارامترها: + query (str): سوال به زبان فارسی + crawl_params (dict, optional): پارامترهای خزش وب + - max_depth: حداکثر عمق خزش + - max_links_per_page: حداکثر تعداد لینک‌های استخراج شده از هر صفحه + - max_paragraphs: حداکثر تعداد پاراگراف‌های استخراج شده از هر صفحه + - num_search_results: تعداد نتایج جستجو + + خروجی: + dict: نتیجه جستجو شامل متن و منابع + """ + rag = ModularRAG() + + # Configure crawling parameters if provided + if crawl_params: + if 'max_depth' in crawl_params: + rag.max_depth = crawl_params['max_depth'] + if 'max_links_per_page' in crawl_params: + rag.max_links_per_page = crawl_params['max_links_per_page'] + if 'max_paragraphs' in crawl_params: + rag.max_paragraphs = crawl_params['max_paragraphs'] + if 'num_search_results' in crawl_params: + rag.num_search_results = crawl_params['num_search_results'] + + # First try to get context from documents + doc_context = rag.extract_context_from_documents(query) + + if doc_context: + return { + "has_context": True, + "context": doc_context, + "source": "documents", + "language": "fa" + } + + # Fall back to web search + web_results, web_docs = rag.crawl_web(query) + + if web_results: + web_context, sources = rag.extract_context_from_web(web_results, web_docs, query) + return { + "has_context": True, + "context": web_context, + "source": "web", + "sources": sources, + "language": "fa" + } + + # No context found + return { + "has_context": False, + "context": "متأسفانه اطلاعاتی در مورد سوال شما یافت نشد.", + "source": "none", + "language": "fa" + }