# --- Dependencies --- # pip install langchain langchain-core langchain-ollama faiss-cpu sentence-transformers # pip install chromadb # Example if switching later import datetime import os import argparse # Import argparse for command-line arguments # (REMOVE LATER - LLM Related) from langchain_ollama import ChatOllama, OllamaEmbeddings from langchain_community.vectorstores import FAISS # (REMOVE LATER - LLM Related) from langchain.prompts import PromptTemplate # (REMOVE LATER - LLM Related) from langchain_core.runnables import RunnablePassthrough, RunnableLambda # (REMOVE LATER - LLM Related) from langchain_core.output_parsers import StrOutputParser from langchain.schema import Document # --- Config --- FAISS_INDEX_PATH = "my_chatbot_memory_index_user_specific" # Directory for FAISS index # --- Ollama LLM & Embeddings Setup --- # (REMOVE LATER - LLM Related) OLLAMA_LLM_MODEL = 'gemma3' OLLAMA_EMBED_MODEL = 'nomic-embed-text' # Keep embeddings even if LLM removed # Global variables for embeddings and vector store (initialized later) embeddings = None vectorstore = None retriever = None def initialize_memory(): """Initializes the Ollama embeddings and loads/creates the FAISS vector store.""" global embeddings, vectorstore, retriever if embeddings and vectorstore: # Prevent re-initialization # print("Memory already initialized.") # Optional: uncomment for verbose logging return try: embeddings = OllamaEmbeddings(model=OLLAMA_EMBED_MODEL) print(f"Successfully initialized Ollama Embeddings: '{OLLAMA_EMBED_MODEL}'") except Exception as e: print(f"Error initializing Ollama Embeddings: {e}") print(f"Ensure Ollama Embedding model is running & pulled ('ollama pull {OLLAMA_EMBED_MODEL}').") raise e # Re-raise the exception so the caller knows initialization failed try: # Check if the actual index file exists, not just the directory if os.path.exists(os.path.join(FAISS_INDEX_PATH, "index.faiss")): print(f"Loading existing FAISS index from: {FAISS_INDEX_PATH}") vectorstore = FAISS.load_local( FAISS_INDEX_PATH, embeddings, allow_dangerous_deserialization=True ) print("FAISS vector store loaded successfully.") else: print(f"No FAISS index found at {FAISS_INDEX_PATH}. Initializing new store.") # FAISS needs at least one text to initialize. vectorstore = FAISS.from_texts( ["system_placeholder_init"], # Use a placeholder text embeddings, metadatas=[{"user_id": "system_init"}] # Placeholder metadata ) vectorstore.save_local(FAISS_INDEX_PATH) print("New FAISS vector store initialized and saved.") # Initialize retriever after vectorstore is loaded/created retriever = vectorstore.as_retriever(search_kwargs=dict(k=5)) # Retrieve more initially except Exception as e: print(f"Error initializing/loading FAISS: {e}") print("Check permissions or delete the index directory if corrupted.") raise e # Re-raise the exception def add_memory(user_id: str, user_input: str, assistant_output: str): """Adds a user query and assistant response to the memory for a specific user.""" global vectorstore if not vectorstore: print("Error: Memory not initialized. Call initialize_memory() first.") return False if user_input and assistant_output and user_id: timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") docs_to_add = [ Document(page_content=f"{user_input}", metadata={"user_id": user_id, "role": "user", "timestamp": timestamp}), Document(page_content=f"{assistant_output}", metadata={"user_id": user_id, "role": "assistant", "timestamp": timestamp}) ] try: vectorstore.add_documents(docs_to_add) vectorstore.save_local(FAISS_INDEX_PATH) # Persist index after adding # print(f"Memory added for user {user_id}.") # Optional: uncomment for verbose logging return True except Exception as e: print(f"Error adding document to FAISS for user {user_id}: {e}") return False else: print("Warning: Attempted to add memory with missing user_id, user_input, or assistant_output.") return False def get_formatted_context(user_id: str, query: str, k: int = 3) -> str: """Retrieves relevant memories for a user based on a query and formats them.""" global retriever if not retriever: print("Error: Memory not initialized. Call initialize_memory() first.") return "خطا: حافظه مقداردهی اولیه نشده است." try: current_retriever = vectorstore.as_retriever(search_kwargs=dict(k=k*2)) docs = current_retriever.invoke(query) user_specific_docs = [doc for doc in docs if doc.metadata.get("user_id") == user_id] formatted = [] for doc in user_specific_docs[:k]: content = doc.page_content role = doc.metadata.get("role", "unknown") if content and content != "system_placeholder_init": formatted.append(f"{role.capitalize()}: {content}") return "\\n".join(formatted) if formatted else "هیچ خاطره مرتبطی از جلسات گذشته برای این کاربر یافت نشد." except Exception as e: print(f"Error retrieving context from FAISS for user {user_id}: {e}") return "خطا در بازیابی زمینه حافظه." # --- Main Execution (Command-Line Interface with Logging) --- if __name__ == "__main__": parser = argparse.ArgumentParser(description="Memory module CLI tool with logging") parser.add_argument("--action", required=True, choices=['add', 'get'], help="Action to perform: add or get memory.") parser.add_argument("--user-id", default="default_user", help="User ID for the memory operation (optional, defaults to 'default_user')") parser.add_argument("--input", required=True, help="Input text (user query for 'get', user message for 'add')") parser.add_argument("--output", help="Output text (assistant response, required for 'add' action)") parser.add_argument("--k", type=int, default=3, help="Number of memories to retrieve for 'get' action (optional, defaults to 3)") args = parser.parse_args() log_filename = f"cli_log_{args.user_id}_{args.action}.txt" log_content = [] log_content.append(f"Timestamp: {datetime.datetime.now()}") log_content.append(f"Action: {args.action}") log_content.append(f"User ID: {args.user_id}") log_content.append(f"Input: {args.input}") if args.action == 'add': log_content.append(f"Output (for add): {args.output}") if args.action == 'get': log_content.append(f"K (for get): {args.k}") log_content.append("---") try: # Initialize memory system first print("Initializing memory system...") initialize_memory() print("Memory system initialized.") # Perform requested action if args.action == 'add': if not args.output: error_msg = "Error: --output is required for action 'add'." print(error_msg) log_content.append(f"Result: FAILURE - {error_msg}") else: if add_memory(args.user_id, args.input, args.output): result_msg = f"Successfully added memory for user '{args.user_id}'." print(result_msg) log_content.append(f"Result: SUCCESS - {result_msg}") else: result_msg = f"Failed to add memory for user '{args.user_id}'. Check logs for details." print(result_msg) log_content.append(f"Result: FAILURE - {result_msg}") elif args.action == 'get': context = get_formatted_context(args.user_id, args.input, args.k) # Print the raw context directly to standard output for immediate feedback print("--- Retrieved Context --- ") print(context) print("-------------------------") log_content.append("Result:") log_content.append(context) except Exception as e: error_msg = f"An error occurred during execution: {e}" print(error_msg) log_content.append(f"Result: FAILURE - {error_msg}") import traceback log_content.append(traceback.format_exc()) finally: # Write the log file regardless of success or failure (if possible) try: with open(log_filename, 'w', encoding='utf-8') as f: f.write("\\n".join(log_content)) print(f"--- Log saved to: {log_filename} ---") except Exception as log_e: print(f"Error writing log file {log_filename}: {log_e}") # --- End of Script ---