diff --git a/organic_chemistry_crawler.py b/organic_chemistry_crawler.py new file mode 100644 index 0000000..2e8d07c --- /dev/null +++ b/organic_chemistry_crawler.py @@ -0,0 +1,1021 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import re +import json +import time +import requests +import argparse +import numpy as np +import traceback +import datetime +from urllib.parse import urljoin, urlparse, quote +import logging +from concurrent.futures import ThreadPoolExecutor +from bs4 import BeautifulSoup +from langchain_community.vectorstores import Chroma +from langchain_core.documents import Document +from langchain_core.retrievers import BaseRetriever +from langchain_community.retrievers import BM25Retriever +from langchain_community.embeddings import HuggingFaceEmbeddings + +# Disable proxy settings that might cause connection issues +def clear_proxy_settings(): + """Remove proxy environment variables that might cause connection issues.""" + for var in ["HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"]: + if var in os.environ: + print(f"Removing proxy env var: {var}") + del os.environ[var] + +# Run at module load time +clear_proxy_settings() + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler("organic_chemistry_crawler.log") + ] +) +logger = logging.getLogger(__name__) + +# Configuration +class Config: + # Search settings + SEARCH_ENGINE = "combined" # Options: "duckduckgo", "arxiv", "combined" + NUM_SEARCH_RESULTS = 10 + + # Crawling settings + MAX_DEPTH = 1 # How deep to follow links from initial pages + MAX_LINKS_PER_PAGE = 5 # Max links to follow from each page + MAX_TOTAL_PAGES = 20 # Max total pages to crawl + REQUEST_TIMEOUT = 10 # Seconds + REQUEST_DELAY = 1 # Seconds between requests + + # Content extraction settings + MIN_CONTENT_LENGTH = 100 # Minimum characters for content to be considered valid + + # RAG settings + CHUNK_SIZE = 1000 + OVERLAP = 200 + CONFIDENCE_THRESHOLD = 0.6 + + # Embedding and LLM settings + EMBEDDING_MODEL = 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2' + LLM_MODEL = 'gemma3' + CHROMA_PERSIST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chroma_db') + SEMANTIC_MEMORY_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'semantic_memory') + + # Confidence thresholds + THRESHOLDS = { + 'direct_knowledge': 0.6, + 'rag': 0.6, + 'web_search': 0.5, + 'memory_match': 0.15 + } + + # Output settings + OUTPUT_LANGUAGE = "fa" # Options: "fa" (Farsi), "en" (English) + + # Organic chemistry specific sites to prioritize + PRIORITY_DOMAINS = [ + "pubchem.ncbi.nlm.nih.gov", + "chemistrysteps.com", + "masterorganicchemistry.com", + "chemguide.co.uk", + "organic-chemistry.org", + "chemistryworld.com", + "chemspider.com", + "organicchemistrytutor.com", + "chem.libretexts.org", + "chemhelper.com", + "arxiv.org", + "jahaneshimi.com", + "blog.faradars.org", + "en.wikipedia.org", + "fa.wikipedia.org" + ] + +class OrganicChemistryCrawler: + """Crawler specialized for organic chemistry information with enhanced RAG capabilities""" + + def __init__(self, config=None): + """Initialize the crawler with configuration""" + self.config = config or Config() + self.visited_urls = set() + self.crawled_content = {} # url -> content + self.url_queue = [] + + # Initialize semantic memory + os.makedirs(self.config.SEMANTIC_MEMORY_DIR, exist_ok=True) + self.semantic_memory = SemanticMemory(self.config.SEMANTIC_MEMORY_DIR) + + # Initialize embeddings and vector store + try: + self.embeddings = HuggingFaceEmbeddings(model_name=self.config.EMBEDDING_MODEL) + logging.info(f"Initialized embeddings with model: {self.config.EMBEDDING_MODEL}") + except Exception as e: + logging.error(f"Error initializing embeddings: {e}") + self.embeddings = None + + # RAG components will be initialized after crawling + self.retriever = None + self.vector_store = None + + def search_duckduckgo(self, query): + """Search DuckDuckGo for organic chemistry information""" + # Add organic chemistry context to the query + if "organic chemistry" not in query.lower(): + search_query = f"{query} organic chemistry" + else: + search_query = query + + logging.info(f"Searching DuckDuckGo for: {search_query}") + + # DuckDuckGo doesn't have an official API, so we'll use their HTML search + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + } + + # URL encode the query + encoded_query = search_query.replace(' ', '+') + search_url = f"https://duckduckgo.com/html/?q={encoded_query}" + + try: + response = requests.get(search_url, headers=headers, timeout=self.config.REQUEST_TIMEOUT) + response.raise_for_status() + + # Use BeautifulSoup for more reliable parsing + soup = BeautifulSoup(response.text, 'html.parser') + result_urls = [] + + # Get results from the result items + for result in soup.select('.result__a'): + href = result.get('href') + if href and href.startswith('http'): + result_urls.append(href) + + if not result_urls: + # Fallback to regex pattern + url_pattern = r']*class="[^"]*result__a[^"]*"[^>]*href="([^"]+)"' + result_urls = re.findall(url_pattern, response.text) + + results = [] + for href in result_urls: + if href and href.startswith('http'): + results.append(href) + + # Prioritize results from known chemistry domains + prioritized_results = [] + other_results = [] + + for url in results: + domain = urlparse(url).netloc + if any(priority_domain in domain for priority_domain in self.config.PRIORITY_DOMAINS): + prioritized_results.append(url) + else: + other_results.append(url) + + # Combine prioritized and other results + combined_results = [] + seen_urls = set() + + # First add DuckDuckGo results + for url in prioritized_results: + if url not in seen_urls: + combined_results.append(url) + seen_urls.add(url) + + # Then add other results + for url in other_results: + if url not in seen_urls: + combined_results.append(url) + seen_urls.add(url) + + return combined_results[:self.config.NUM_SEARCH_RESULTS] + + except Exception as e: + logging.error(f"Error searching DuckDuckGo: {e}") + return [] + + def search_arxiv(self, query): + """Search arXiv for organic chemistry papers""" + logging.info(f"Searching arXiv for: {query}") + + # Add organic chemistry context to the query + if "organic chemistry" not in query.lower(): + search_query = f"{query} organic chemistry" + else: + search_query = query + + # URL encode the query + encoded_query = search_query.replace(' ', '+') + + # arXiv API endpoint + search_url = f"http://export.arxiv.org/api/query?search_query=all:{encoded_query}&start=0&max_results={self.config.NUM_SEARCH_RESULTS}" + + try: + response = requests.get(search_url, timeout=self.config.REQUEST_TIMEOUT) + response.raise_for_status() + + # Parse the XML response using regex + xml = response.text + + # Extract entry links using regex + entry_pattern = r'.*?(.*?).*?' + entries = re.findall(entry_pattern, xml, re.DOTALL) + + results = [] + for entry_id in entries: + if entry_id: + results.append(entry_id) + + return results + + except Exception as e: + logging.error(f"Error searching arXiv: {e}") + return [] + + def search(self, query): + """Search for organic chemistry information using configured search engine""" + if self.config.SEARCH_ENGINE == "duckduckgo": + return self.search_duckduckgo(query) + elif self.config.SEARCH_ENGINE == "arxiv": + return self.search_arxiv(query) + elif self.config.SEARCH_ENGINE == "combined": + # Use both search engines and combine results + duckduckgo_results = self.search_duckduckgo(query) + arxiv_results = self.search_arxiv(query) + + # Combine and deduplicate results + combined_results = [] + seen_urls = set() + + # First add DuckDuckGo results + for url in duckduckgo_results: + if url not in seen_urls: + combined_results.append(url) + seen_urls.add(url) + + # Then add arXiv results + for url in arxiv_results: + if url not in seen_urls: + combined_results.append(url) + seen_urls.add(url) + + return combined_results[:self.config.NUM_SEARCH_RESULTS] + else: + logging.error(f"Unknown search engine: {self.config.SEARCH_ENGINE}") + return [] + + def extract_content(self, html, url): + """Extract relevant content from HTML using BeautifulSoup""" + soup = BeautifulSoup(html, 'html.parser') + + # Remove script, style, and nav elements + for tag in ['script', 'style', 'nav', 'header', 'footer']: + for element in soup.find_all(tag): + element.decompose() + + # Extract title + title = soup.title.text.strip() if soup.title else urlparse(url).path + + # Try to find main content + content = "" + + # Try article tags first + article_content = [] + for article in soup.find_all('article'): + text = article.get_text(strip=True) + if len(text) > self.config.MIN_CONTENT_LENGTH: + article_content.append(text) + + if article_content: + content = "\n\n".join(article_content) + else: + # Try content divs + for div in soup.find_all('div', class_=lambda c: c and any(term in str(c).lower() for term in ['content', 'main', 'article', 'body'])): + text = div.get_text(strip=True) + if len(text) > self.config.MIN_CONTENT_LENGTH: + content += text + "\n\n" + + # If still no content, extract all paragraphs + if not content or len(content) < self.config.MIN_CONTENT_LENGTH: + paragraphs = [p.get_text(strip=True) for p in soup.find_all('p') if len(p.get_text(strip=True)) > 20] + if paragraphs: + content = "\n\n".join(paragraphs) + + # Clean up content + content = re.sub(r'\s+', ' ', content).strip() + + return { + "title": title, + "content": content, + "url": url + } + + def extract_links(self, html, base_url): + """Extract links from HTML to follow""" + soup = BeautifulSoup(html, 'html.parser') + links = [] + + for a_tag in soup.find_all('a', href=True): + href = a_tag['href'] + + # Skip empty links, anchors, or javascript + if not href or href.startswith('#') or href.startswith('javascript:'): + continue + + # Convert relative URLs to absolute + absolute_url = urljoin(base_url, href) + + # Skip non-HTTP links + if not absolute_url.startswith(('http://', 'https://')): + continue + + # Skip already visited URLs + if absolute_url in self.visited_urls: + continue + + # Prioritize chemistry domains + domain = urlparse(absolute_url).netloc + if any(priority_domain in domain for priority_domain in self.config.PRIORITY_DOMAINS): + links.insert(0, absolute_url) # Add to beginning of list + else: + links.append(absolute_url) + + # Return limited number of links + return links[:self.config.MAX_LINKS_PER_PAGE] + + def crawl_url(self, url, depth=0): + """Crawl a single URL and extract content""" + if url in self.visited_urls or len(self.crawled_content) >= self.config.MAX_TOTAL_PAGES: + return + + logging.info(f"Crawling: {url} (depth {depth})") + self.visited_urls.add(url) + + try: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + } + response = requests.get(url, headers=headers, timeout=self.config.REQUEST_TIMEOUT) + response.raise_for_status() + + # Skip non-HTML content + content_type = response.headers.get('Content-Type', '') + if 'text/html' not in content_type and 'application/xhtml+xml' not in content_type: + logging.info(f"Skipping non-HTML content: {url} ({content_type})") + return + + # Extract content + content_data = self.extract_content(response.text, url) + + # Only save if we have meaningful content + if len(content_data["content"]) > self.config.MIN_CONTENT_LENGTH: + self.crawled_content[url] = content_data + + # Follow links if we haven't reached max depth + if depth < self.config.MAX_DEPTH: + links = self.extract_links(response.text, url) + for link in links: + if link not in self.visited_urls: + self.url_queue.append((link, depth + 1)) + + # Respect rate limits + time.sleep(self.config.REQUEST_DELAY) + + except Exception as e: + logging.error(f"Error crawling {url}: {e}") + + def process_queue(self): + """Process the URL queue with multithreading""" + with ThreadPoolExecutor(max_workers=5) as executor: + while self.url_queue and len(self.crawled_content) < self.config.MAX_TOTAL_PAGES: + # Get a batch of URLs to process + batch = [] + while self.url_queue and len(batch) < 5: + batch.append(self.url_queue.pop(0)) + + # Process the batch + futures = [executor.submit(self.crawl_url, url, depth) for url, depth in batch] + for future in futures: + future.result() # Wait for completion + + def crawl(self, query): + """Search and crawl for information about the query""" + # Step 1: Search for initial URLs + initial_urls = self.search(query) + if not initial_urls: + logging.warning(f"No search results found for query: {query}") + return {} + + # Step 2: Initialize crawling queue + self.url_queue = [(url, 0) for url in initial_urls] + self.visited_urls = set() + self.crawled_content = {} + + # Step 3: Process the queue + self.process_queue() + + # Step 4: Return the crawled content + logging.info(f"Crawling complete. Found {len(self.crawled_content)} pages with content.") + return self.crawled_content + + def chunk_text(self, text, chunk_size=None, overlap=None): + """Split text into chunks with overlap""" + if chunk_size is None: + chunk_size = self.config.CHUNK_SIZE + if overlap is None: + overlap = self.config.OVERLAP + + # If text is shorter than chunk size, return as is + if len(text) <= chunk_size: + return [text] + + chunks = [] + start = 0 + + while start < len(text): + # Get chunk of specified size + end = start + chunk_size + + # Adjust end to avoid cutting words + if end < len(text): + # Try to find a space to break at + while end > start and text[end] != ' ': + end -= 1 + if end == start: # If no space found, use the original end + end = start + chunk_size + + # Add chunk to list + chunks.append(text[start:end]) + + # Move start position for next chunk, considering overlap + start = end - overlap + + return chunks + + def prepare_documents(self): + """Prepare crawled content as documents for RAG using LangChain Document format""" + documents = [] + + for url, data in self.crawled_content.items(): + content = data["content"] + title = data["title"] + + # Chunk the content + chunks = self.chunk_text(content) + + # Create documents from chunks + for i, chunk in enumerate(chunks): + doc = Document( + page_content=chunk, + metadata={ + "source": url, + "title": title, + "chunk": i + 1, + "total_chunks": len(chunks) + } + ) + documents.append(doc) + + return documents + + def initialize_retriever(self, documents): + """Initialize the hybrid retriever with vector search and BM25""" + if not documents or not self.embeddings: + logging.error("No documents or embeddings available for retriever initialization") + return None + + try: + # Create BM25 retriever + bm25_retriever = BM25Retriever.from_documents(documents) + bm25_retriever.k = 5 # Top k results to retrieve + + # Initialize or recreate vector store + if os.path.exists(self.config.CHROMA_PERSIST_DIR): + import shutil + logging.info("Removing existing Chroma DB to prevent dimension mismatch") + shutil.rmtree(self.config.CHROMA_PERSIST_DIR) + + # Create vector store + os.makedirs(self.config.CHROMA_PERSIST_DIR, exist_ok=True) + vector_store = Chroma.from_documents( + documents=documents, + embedding=self.embeddings, + persist_directory=self.config.CHROMA_PERSIST_DIR + ) + self.vector_store = vector_store + + # Create vector retriever + vector_retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 5}) + + # Create hybrid retriever (BM25 70%, Vector 30%) + hybrid_retriever = HybridRetriever(vector_retriever, bm25_retriever, vector_weight=0.3) + logging.info("Hybrid retriever initialized successfully") + + return hybrid_retriever + except Exception as e: + logging.error(f"Error initializing retriever: {e}") + traceback.print_exc() + return None + + def check_corrections(self, query): + """Check if a correction exists for this query using semantic memory""" + logging.info("Checking semantic memory for corrections...") + + # Use semantic memory to find similar queries + stored_query, answer, similarity = self.semantic_memory.retrieve_memory(query) + + if stored_query and answer: + logging.info(f"Found semantic match in memory with similarity: {similarity:.2f}") + logging.info(f"Original query: '{stored_query}'") + logging.info(f"Current query: '{query}'") + return answer, f"Semantic Memory (similarity: {similarity:.2f})" + + return None, None + + def estimate_confidence(self, text, query, context=None): + """Estimate confidence of response using more sophisticated analysis""" + # Start with baseline confidence + confidence = 0.5 + + # Check for uncertainty markers + uncertainty_phrases = [ + "نمی‌دانم", "مطمئن نیستم", "ممکن است", "شاید", "احتمالاً", + "فکر می‌کنم", "به نظر می‌رسد", "I don't know", "not sure", + "might be", "perhaps", "possibly", "it seems" + ] + + if any(phrase in text.lower() for phrase in uncertainty_phrases): + confidence -= 0.2 + + # Check for question relevance + query_words = set(re.findall(r'\b\w+\b', query.lower())) + text_words = set(re.findall(r'\b\w+\b', text.lower())) + + # Calculate overlap between query and response + if query_words: + overlap_ratio = len(query_words.intersection(text_words)) / len(query_words) + if overlap_ratio > 0.5: + confidence += 0.2 + elif overlap_ratio < 0.2: + confidence -= 0.2 + + # Check for chemistry-specific terms + chemistry_terms = [ + "molecule", "compound", "reaction", "bond", "carbon", "hydrogen", "oxygen", + "nitrogen", "synthesis", "organic", "chemical", "structure", "formula", + "مولکول", "ترکیب", "واکنش", "پیوند", "کربن", "هیدروژن", "اکسیژن", + "نیتروژن", "سنتز", "آلی", "شیمیایی", "ساختار", "فرمول" + ] + + chem_term_count = sum(1 for term in chemistry_terms if term.lower() in text.lower()) + term_factor = min(chem_term_count / 5, 1.0) * 0.2 + confidence += term_factor + + # If context provided, check context relevance + if context: + context_words = set(re.findall(r'\b\w+\b', context.lower())) + if context_words: + context_overlap = len(context_words.intersection(text_words)) / len(context_words) + if context_overlap > 0.3: + confidence += 0.2 + else: + confidence -= 0.1 + + # Higher confidence for longer, more detailed responses + if len(text) > 500: + confidence += 0.1 + elif len(text) < 100: + confidence -= 0.1 + + # Ensure confidence is within [0,1] + return max(0.0, min(1.0, confidence)) + + def check_direct_knowledge(self, query): + """Check if the LLM can answer directly from its knowledge""" + logging.info("Checking LLM's direct knowledge...") + try: + output_language = "فارسی" if self.config.OUTPUT_LANGUAGE == "fa" else "English" + + prompt = f"""به این سوال در مورد شیمی آلی با استفاده از دانش خود پاسخ دهید. به زبان {output_language} پاسخ دهید. + +سوال: {query} + +پاسخ:""" + + response = query_llm(prompt, model=self.config.LLM_MODEL) + confidence = self.estimate_confidence(response, query) + logging.info(f"LLM direct knowledge confidence: {confidence:.2f}") + + return response, confidence + except Exception as e: + logging.error(f"Error in direct knowledge check: {e}") + return "Error processing response", 0.0 + + def rag_query(self, query): + """Use RAG to retrieve and generate answer based on crawled content""" + # Prepare documents from crawled content + documents = self.prepare_documents() + + if not documents: + logging.warning("No documents available for RAG") + if self.config.OUTPUT_LANGUAGE == "fa": + return "متاسفانه اطلاعاتی در مورد این موضوع پیدا نکردم.", 0.0 + else: + return "I couldn't find any information about that topic.", 0.0 + + # Initialize retriever if not already done + if not self.retriever: + self.retriever = self.initialize_retriever(documents) + + if not self.retriever: + logging.error("Failed to initialize retriever") + if self.config.OUTPUT_LANGUAGE == "fa": + return "خطا در پردازش اطلاعات رخ داده است.", 0.0 + else: + return "An error occurred while processing information.", 0.0 + + try: + # Retrieve relevant documents + relevant_docs = self.retriever.get_relevant_documents(query) + + if not relevant_docs: + logging.warning("No relevant documents found") + if self.config.OUTPUT_LANGUAGE == "fa": + return "متاسفانه اطلاعات مرتبطی پیدا نکردم.", 0.0 + else: + return "I couldn't find any relevant information.", 0.0 + + # Prepare context from retrieved documents + context = "\n\n".join([ + f"Source: {doc.metadata.get('title')} ({doc.metadata.get('source')})\n{doc.page_content}" + for doc in relevant_docs[:5] + ]) + + # Extract unique sources + sources = list(set(doc.metadata.get('source') for doc in relevant_docs[:5])) + + # Prepare prompt for LLM + output_language = "فارسی" if self.config.OUTPUT_LANGUAGE == "fa" else "English" + + prompt = f"""با توجه به اطلاعات زیر، به سوال در مورد شیمی آلی پاسخ دهید. به زبان {output_language} پاسخ دهید. + +اطلاعات: +{context} + +سوال: {query} + +پاسخ:""" + + # Query LLM + response = query_llm(prompt, model=self.config.LLM_MODEL) + + # Estimate confidence + confidence = self.estimate_confidence(response, query, context) + logging.info(f"RAG confidence: {confidence:.2f}") + + # Add source attribution + if self.config.OUTPUT_LANGUAGE == "fa": + response += f"\n\nاین اطلاعات از {len(sources)} منبع گردآوری شده است." + else: + response += f"\n\nThis information was compiled from {len(sources)} sources." + + return response, confidence, sources + except Exception as e: + logging.error(f"Error in RAG query: {e}") + traceback.print_exc() + if self.config.OUTPUT_LANGUAGE == "fa": + return "خطا در پردازش اطلاعات رخ داده است.", 0.0, [] + else: + return "An error occurred while processing information.", 0.0, [] + + def get_answer(self, query): + """Main method to get an answer following the agent-based architecture""" + logging.info(f"Processing query: {query}") + + # STEP 1: Check corrections memory + correction, source = self.check_corrections(query) + if correction: + return f"{correction}\n\n[Source: {source}]" + + # STEP 2: Try direct LLM knowledge + direct_response, direct_confidence = self.check_direct_knowledge(query) + + if direct_confidence >= self.config.THRESHOLDS['direct_knowledge']: + logging.info("Using direct LLM knowledge (high confidence)") + return f"{direct_response}\n\n[Source: LLM Knowledge, Confidence: {direct_confidence:.2f}]" + + # STEP 3: Crawl and index content if not already done + if not self.crawled_content: + self.crawl(query) + + # STEP 4: Try RAG with crawled documents + rag_response, rag_confidence, sources = self.rag_query(query) + + if rag_confidence >= self.config.THRESHOLDS['rag']: + logging.info("Using RAG response (sufficient confidence)") + sources_text = ", ".join(sources[:3]) + return f"{rag_response}\n\n[Source: Web Content, Confidence: {rag_confidence:.2f}, Sources: {sources_text}]" + + # STEP 5: Fall back to direct response with warning + logging.info("No high-confidence source found, using direct response with warning") + return f"{direct_response}\n\n[Warning: Low confidence ({direct_confidence:.2f}). Please verify this information.]" + + def add_correction(self, incorrect_query, correct_answer): + """Add a correction to semantic memory""" + try: + # Add the correction to semantic memory + success = self.semantic_memory.add_memory( + incorrect_query, + correct_answer, + {"type": "correction", "timestamp": str(datetime.datetime.now())} + ) + + if success: + logging.info(f"Added correction for: '{incorrect_query}'") + + return success + except Exception as e: + logging.error(f"Error adding correction: {e}") + return False + + def save_results(self, query, output_file=None, answer=None, confidence=None): + """Save crawled results to a JSON file and answer to a text file""" + # Create a safe filename based on the query + safe_query = re.sub(r'[^\w\s-]', '', query).strip().lower() + safe_query = re.sub(r'[-\s]+', '_', safe_query) + timestamp = int(time.time()) + + # Save crawled content to JSON + if not output_file: + output_file = f"organic_chemistry_{safe_query}_{timestamp}.json" + + with open(output_file, 'w', encoding='utf-8') as f: + json.dump({ + "query": query, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "results": self.crawled_content + }, f, ensure_ascii=False, indent=2) + + logging.info(f"Results saved to {output_file}") + + # Save answer to text file if provided + if answer: + answer_file = f"organic_chemistry_results.txt" + + # Append to existing results file + with open(answer_file, 'a', encoding='utf-8') as f: + f.write(f"\n{'='*80}\n") + if self.config.OUTPUT_LANGUAGE == "fa": + f.write(f"سوال: {query}\n") + if confidence is not None: + f.write(f"اطمینان: {confidence:.2f}\n") + else: + f.write(f"Query: {query}\n") + if confidence is not None: + f.write(f"Confidence: {confidence:.2f}\n") + f.write(f"\n{answer}\n") + + logging.info(f"Answer saved to {answer_file}") + return output_file, answer_file + + return output_file + +# Simple direct LLM query function +def query_llm(prompt, model='gemma3'): + """Query the LLM model directly.""" + try: + # In a real implementation, this would use the LLM's native API + from transformers import pipeline + pipe = pipeline("text-generation", model=model) + response = pipe(prompt, max_length=1024, temperature=0.7) + return response[0]["generated_text"].strip() + except Exception as e: + logging.error(f"Error querying LLM: {e}") + # Return error message without hardcoded answers + return f"Error: {str(e)}" + +# Hybrid retriever that combines BM25 and vector search +class HybridRetriever(BaseRetriever): + """Hybrid retriever combining BM25 and vector search with configurable weights""" + + def __init__(self, vector_retriever, bm25_retriever, vector_weight=0.3): + """Initialize with separate retrievers and weights""" + super().__init__() + # Store retrievers and weights + self._vector_retriever = vector_retriever + self._bm25_retriever = bm25_retriever + self._vector_weight = vector_weight + self._bm25_weight = 1.0 - vector_weight + + def get_relevant_documents(self, query): + """Get relevant documents using weighted combination of retrievers""" + try: + # Get results from both retrievers + vector_docs = self._vector_retriever.get_relevant_documents(query) + bm25_docs = self._bm25_retriever.get_relevant_documents(query) + + # Create dictionary to track unique documents and their scores + doc_dict = {} + + # Add vector docs with their weights + for i, doc in enumerate(vector_docs): + # Score based on position (inverse rank) + score = (len(vector_docs) - i) * self._vector_weight + doc_id = doc.page_content[:50] # Use first 50 chars as a simple ID + if doc_id in doc_dict: + doc_dict[doc_id]["score"] += score + else: + doc_dict[doc_id] = {"doc": doc, "score": score} + + # Add BM25 docs with their weights + for i, doc in enumerate(bm25_docs): + # Score based on position (inverse rank) + score = (len(bm25_docs) - i) * self._bm25_weight + doc_id = doc.page_content[:50] # Use first 50 chars as a simple ID + if doc_id in doc_dict: + doc_dict[doc_id]["score"] += score + else: + doc_dict[doc_id] = {"doc": doc, "score": score} + + # Sort by combined score (highest first) + sorted_docs = sorted(doc_dict.values(), key=lambda x: x["score"], reverse=True) + + # Return just the document objects + return [item["doc"] for item in sorted_docs] + except Exception as e: + logging.error(f"Error in hybrid retrieval: {e}") + return [] + + def _get_relevant_documents(self, query): + """Required method to satisfy the abstract base class""" + return self.get_relevant_documents(query) + +# Semantic Memory class for storing and retrieving memories using embeddings +class SemanticMemory: + """Semantic memory system using embeddings and vector database""" + + def __init__(self, persist_directory): + """Initialize the semantic memory with embeddings""" + self.embeddings = HuggingFaceEmbeddings(model_name=Config.EMBEDDING_MODEL) + self.persist_directory = persist_directory + + # Create directory if it doesn't exist + os.makedirs(persist_directory, exist_ok=True) + + # Initialize or load the vector store + try: + self.memory_store = Chroma( + persist_directory=persist_directory, + embedding_function=self.embeddings + ) + logging.info(f"Loaded semantic memory from {persist_directory}") + except Exception as e: + logging.info(f"Creating new semantic memory: {e}") + self.memory_store = Chroma( + persist_directory=persist_directory, + embedding_function=self.embeddings + ) + + def add_memory(self, query, answer, metadata=None): + """Add a memory (query-answer pair) to the semantic memory""" + if metadata is None: + metadata = {"type": "correction", "timestamp": str(datetime.datetime.now())} + + # Create a document with the query as content and answer in metadata + document = Document( + page_content=query, + metadata={"answer": answer, **metadata} + ) + + # Add to vector store + self.memory_store.add_documents([document]) + self.memory_store.persist() + logging.info(f"Added memory for query: '{query}'") + return True + + def retrieve_memory(self, query, similarity_threshold=None): + """Retrieve most similar memory to the query""" + if similarity_threshold is None: + similarity_threshold = Config.THRESHOLDS['memory_match'] + + try: + # Search for similar queries + results = self.memory_store.similarity_search_with_score(query, k=5) + + if not results: + return None, None, 0.0 + + # Process all results to find the best match + best_doc = None + best_similarity = 0.0 + + for doc, score in results: + # Convert distance to similarity (Chroma returns distance, not similarity) + # Using a simple inverse relationship for better cross-language matching + similarity = 1.0 / (1.0 + score * 2) + + logging.info(f"Memory candidate: '{doc.page_content}' with similarity: {similarity:.4f}") + + if similarity > best_similarity: + best_similarity = similarity + best_doc = doc + + if best_similarity >= similarity_threshold: + logging.info(f"Best memory match: '{best_doc.page_content}' with similarity: {best_similarity:.4f}") + return best_doc.page_content, best_doc.metadata.get("answer"), best_similarity + else: + logging.info(f"Best memory match below threshold ({best_similarity:.4f} < {similarity_threshold})") + + return None, None, 0.0 + + except Exception as e: + logging.error(f"Error retrieving memory: {e}") + return None, None, 0.0 + + def get_all_memories(self): + """Get all memories in the system""" + try: + return self.memory_store.get() + except Exception as e: + logging.error(f"Error getting all memories: {e}") + return {"ids": [], "documents": [], "metadatas": []} + +def main(): + parser = argparse.ArgumentParser(description="Organic Chemistry Web Crawler with Advanced RAG") + + # Define command modes + mode_group = parser.add_mutually_exclusive_group(required=True) + mode_group.add_argument("--query", "-q", help="The chemistry query to search for") + mode_group.add_argument("--add-correction", action="store_true", help="Add a correction to memory") + + # Query mode parameters + parser.add_argument("--engine", choices=["duckduckgo", "arxiv", "combined"], default="combined", + help="Search engine to use (default: combined)") + parser.add_argument("--depth", type=int, default=1, + help="Crawling depth (default: 1)") + parser.add_argument("--max-pages", type=int, default=20, + help="Maximum pages to crawl (default: 20)") + parser.add_argument("--output", help="Output JSON file (default: auto-generated)") + parser.add_argument("--language", choices=["fa", "en"], default="fa", + help="Output language (default: fa for Farsi)") + + # Correction mode parameters + parser.add_argument("--incorrect", help="The incorrect query to add a correction for") + parser.add_argument("--correct", help="The correct answer for the query") + + args = parser.parse_args() + + # Configure crawler + config = Config() + config.SEARCH_ENGINE = args.engine + config.MAX_DEPTH = args.depth + config.MAX_TOTAL_PAGES = args.max_pages + config.OUTPUT_LANGUAGE = args.language + + # Create crawler + crawler = OrganicChemistryCrawler(config) + + if args.add_correction: + # Add a correction to semantic memory + if not args.incorrect or not args.correct: + parser.error("Both --incorrect and --correct are required for adding a correction") + + success = crawler.add_correction(args.incorrect, args.correct) + if success: + print(f"Correction added successfully for query: '{args.incorrect}'") + else: + print("Failed to add correction") + else: + # Process a query + query = args.query + print(f"\nProcessing query: {query}") + + # Get answer using the agent-based approach + answer = crawler.get_answer(query) + + # Save the results with the answer + confidence = 0.0 + if "[Confidence:" in answer: + match = re.search(r"\[Confidence: ([\d.]+)\]", answer) + if match: + confidence = float(match.group(1)) + + output_file, answer_file = crawler.save_results(query, args.output, answer, confidence) + + print(f"\nProcessing complete! Results saved to: {output_file}") + print(f"Found information from {len(crawler.crawled_content)} web pages.") + print(f"\nAnswer:") + print("=" * 80) + print(answer) + print("=" * 80) + print(f"\nFull answer saved to: {answer_file}") + +if __name__ == "__main__": + main()