# app.py import os import pandas as pd import chardet import logging import gradio as gr import json import hashlib import numpy as np # ADDED for easy array handling from typing import Optional, List, Tuple, ClassVar, Dict from sentence_transformers import SentenceTransformer, util, CrossEncoder from langchain.llms.base import LLM import google.generativeai as genai # Import smolagents components from smolagents import CodeAgent, LiteLLMModel, DuckDuckGoSearchTool, ManagedAgent, HfApiModel ############################################################################### # 1) Logging Setup ############################################################################### logging.basicConfig(level=logging.INFO) logger = logging.getLogger("Daily Wellness AI") ############################################################################### # 2) API Key Handling and Enhanced GeminiLLM Class ############################################################################### def clean_api_key(key: str) -> str: """Remove non-ASCII characters and strip whitespace from the API key.""" return ''.join(c for c in key if ord(c) < 128).strip() # Load the GEMINI API key from environment variables gemini_api_key = os.environ.get("GEMINI_API_KEY") if not gemini_api_key: logger.error("GEMINI_API_KEY environment variable not set.") raise EnvironmentError("Please set the GEMINI_API_KEY environment variable.") gemini_api_key = clean_api_key(gemini_api_key) logger.info("GEMINI API Key loaded successfully.") # Configure Google Generative AI try: genai.configure(api_key=gemini_api_key) logger.info("Configured Google Generative AI with provided API key.") except Exception as e: logger.error(f"Failed to configure Google Generative AI: {e}") raise e class GeminiLLM(LLM): model_name: ClassVar[str] = "gemini-2.0-flash-exp" temperature: float = 0.7 top_p: float = 0.95 top_k: int = 40 max_tokens: int = 2048 @property def _llm_type(self) -> str: return "custom_gemini" def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: generation_config = { "temperature": self.temperature, "top_p": self.top_p, "top_k": self.top_k, "max_output_tokens": self.max_tokens, } try: logger.debug(f"Initializing GenerativeModel with config: {generation_config}") model = genai.GenerativeModel( model_name=self.model_name, generation_config=generation_config, ) logger.debug("GenerativeModel initialized successfully.") chat_session = model.start_chat(history=[]) logger.debug("Chat session started.") # Send the prompt as plain text response = chat_session.send_message(prompt) logger.debug(f"Prompt sent to model: {prompt}") logger.debug(f"Raw response received: {response.text}") return response.text except Exception as e: logger.error(f"Error generating response with GeminiLLM: {e}") logger.debug("Exception details:", exc_info=True) raise e # Instantiate the GeminiLLM globally llm = GeminiLLM() ############################################################################### # 3) CSV Loading and Processing ############################################################################### def load_csv(file_path: str): try: if not os.path.isfile(file_path): logger.error(f"CSV file does not exist: {file_path}") return [], [] with open(file_path, 'rb') as f: result = chardet.detect(f.read()) encoding = result['encoding'] data = pd.read_csv(file_path, encoding=encoding) if 'Question' not in data.columns or 'Answers' not in data.columns: raise ValueError("CSV must contain 'Question' and 'Answers' columns.") data = data.dropna(subset=['Question', 'Answers']) logger.info(f"Loaded {len(data)} entries from {file_path}") return data['Question'].tolist(), data['Answers'].tolist() except Exception as e: logger.error(f"Error loading CSV: {e}") return [], [] # Path to your CSV file (ensure 'AIChatbot.csv' is in the repository) csv_file_path = "AIChatbot.csv" corpus_questions, corpus_answers = load_csv(csv_file_path) if not corpus_questions: raise ValueError("Failed to load the knowledge base.") ############################################################################### # 4) Sentence Embeddings & Cross-Encoder ############################################################################### embedding_model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1" try: embedding_model = SentenceTransformer(embedding_model_name) logger.info(f"Loaded embedding model: {embedding_model_name}") except Exception as e: logger.error(f"Failed to load embedding model: {e}") raise e try: question_embeddings = embedding_model.encode(corpus_questions, convert_to_tensor=True) logger.info("Encoded question embeddings successfully.") except Exception as e: logger.error(f"Failed to encode question embeddings: {e}") raise e cross_encoder_name = "cross-encoder/ms-marco-MiniLM-L-6-v2" try: cross_encoder = CrossEncoder(cross_encoder_name) logger.info(f"Loaded cross-encoder model: {cross_encoder_name}") except Exception as e: logger.error(f"Failed to load cross-encoder model: {e}") raise e ############################################################################### # 5) Retrieval + Re-Ranking ############################################################################### class EmbeddingRetriever: def __init__(self, questions, answers, embeddings, model, cross_encoder): self.questions = questions self.answers = answers self.embeddings = embeddings self.model = model self.cross_encoder = cross_encoder def retrieve(self, query: str, top_k: int = 3): try: query_embedding = self.model.encode(query, convert_to_tensor=True) scores = util.pytorch_cos_sim(query_embedding, self.embeddings)[0].cpu().tolist() scored_data = sorted(zip(self.questions, self.answers, scores), key=lambda x: x[2], reverse=True)[:top_k] cross_inputs = [[query, candidate[0]] for candidate in scored_data] cross_scores = self.cross_encoder.predict(cross_inputs) reranked = sorted(zip(scored_data, cross_scores), key=lambda x: x[1], reverse=True) final_retrieved = [(entry[0][1], entry[1]) for entry in reranked] logger.debug(f"Retrieved and reranked answers: {final_retrieved}") return final_retrieved except Exception as e: logger.error(f"Error during retrieval: {e}") logger.debug("Exception details:", exc_info=True) return [] retriever = EmbeddingRetriever(corpus_questions, corpus_answers, question_embeddings, embedding_model, cross_encoder) ############################################################################### # 6) Sanity Check Tool ############################################################################### class QuestionSanityChecker: def __init__(self, llm: GeminiLLM): self.llm = llm def is_relevant(self, question: str) -> bool: prompt = ( f"You are an assistant that determines whether a question is relevant to daily wellness.\n\n" f"Question: {question}\n\n" f"Is the above question relevant to daily wellness? Respond with 'Yes' or 'No' only." ) try: response = self.llm._call(prompt) is_yes = 'yes' in response.lower() is_no = 'no' in response.lower() logger.debug(f"Sanity check response: '{response}', interpreted as is_yes={is_yes}, is_no={is_no}") if is_yes and not is_no: return True elif is_no and not is_yes: return False else: # Ambiguous response logger.warning(f"Sanity check ambiguous response: '{response}'. Defaulting to 'No'.") return False except Exception as e: logger.error(f"Error in sanity check: {e}") logger.debug("Exception details:", exc_info=True) return False # Instantiate the sanity checker globally sanity_checker = QuestionSanityChecker(llm) ############################################################################### # 7) smolagents Integration: GROQ Model and Web Search ############################################################################### # Initialize the smolagents' LiteLLMModel with GROQ model smol_model = HfApiModel() # Instantiate the DuckDuckGo search tool search_tool = DuckDuckGoSearchTool() # Create the web agent with the search tool web_agent = CodeAgent( tools=[search_tool], model=smol_model ) # Define the managed web agent managed_web_agent = ManagedAgent( agent=web_agent, name="web_search", description="Runs a web search for you. Provide your query as an argument." ) # Create the manager agent with managed web agent and additional tools if needed manager_agent = CodeAgent( tools=[], # Add additional tools here if required model=smol_model, managed_agents=[managed_web_agent] ) ############################################################################### # 8) Answer Expansion ############################################################################### class AnswerExpander: def __init__(self, llm: GeminiLLM): self.llm = llm def expand(self, query: str, retrieved_answers: List[str], detail: bool = False) -> str: """ Synthesize answers into a single cohesive response. If detail=True, provide a more detailed response. """ try: reference_block = "\n".join( f"- {idx+1}) {ans}" for idx, ans in enumerate(retrieved_answers, start=1) ) # ADDED: More elaboration if detail=True detail_instructions = ( "Provide a thorough, in-depth explanation, adding relevant tips and context, " "while remaining creative and brand-aligned. " if detail else "Provide a concise response in no more than 4 sentences." ) prompt = ( f"You are Daily Wellness AI, a friendly wellness expert. Below are multiple " f"potential answers retrieved from a local knowledge base. You have a user question.\n\n" f"Question: {query}\n\n" f"Retrieved Answers:\n{reference_block}\n\n" f"Please synthesize these references into a single cohesive, creative, and brand-aligned response. " f"{detail_instructions} " f"End with a short inspirational note.\n\n" "Disclaimer: This is general wellness information, not a substitute for professional medical advice." ) logger.debug(f"Generated prompt for answer expansion: {prompt}") response = self.llm._call(prompt) logger.debug(f"Expanded answer: {response}") return response.strip() except Exception as e: logger.error(f"Error expanding answer: {e}") logger.debug("Exception details:", exc_info=True) return "Sorry, an error occurred while generating a response." answer_expander = AnswerExpander(llm) ############################################################################### # 9) Persistent Cache (ADDED) ############################################################################### CACHE_FILE = "query_cache.json" SIMILARITY_THRESHOLD_CACHE = 0.8 # Adjust for how close a query must be to reuse cache def load_cache() -> Dict: """Load the cache from the local JSON file.""" if os.path.isfile(CACHE_FILE): try: with open(CACHE_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.error(f"Failed to load cache file: {e}") return {} return {} def save_cache(cache_data: Dict): """Save the cache dictionary to a local JSON file.""" try: with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(cache_data, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Failed to save cache file: {e}") def compute_hash(text: str) -> str: """Compute a simple hash for the text to handle duplicates in a consistent way.""" return hashlib.md5(text.encode("utf-8")).hexdigest() # ADDED: Load cache at startup cache_store = load_cache() ############################################################################### # 9.1) Utility to attempt cached retrieval (ADDED) ############################################################################### def get_cached_answer(query: str) -> Optional[str]: """ Returns a cached answer if there's a very similar query in the cache. We'll compare embeddings to find if a stored query is above threshold. """ if not cache_store: return None # Compute embedding for the incoming query query_embedding = embedding_model.encode(query, convert_to_tensor=True) # Check all cached items best_score = 0.0 best_answer = None for cached_q, cache_data in cache_store.items(): stored_embedding = np.array(cache_data["embedding"], dtype=np.float32) score = util.pytorch_cos_sim(query_embedding, stored_embedding)[0].item() if score > best_score: best_score = score best_answer = cache_data["answer"] if best_score >= SIMILARITY_THRESHOLD_CACHE: logger.info(f"Cache hit! Similarity: {best_score:.2f}, returning cached answer.") return best_answer return None def store_in_cache(query: str, answer: str): """ Store a query-answer pair in the cache with the query's embedding. """ query_embedding = embedding_model.encode(query, convert_to_tensor=True).cpu().tolist() cache_key = compute_hash(query) cache_store[cache_key] = { "query": query, "answer": answer, "embedding": query_embedding } save_cache(cache_store) ############################################################################### # 10) Query Handling ############################################################################### def handle_query(query: str, detail: bool = False) -> str: """ Main function to process the query. :param query: The user's question. :param detail: Whether the user wants a more detailed response. :return: Response string from Daily Wellness AI. """ if not query or not isinstance(query, str) or len(query.strip()) == 0: return "Please provide a valid question." try: # 1) Sanity Check: Determine if the question is relevant to daily wellness is_relevant = sanity_checker.is_relevant(query) if not is_relevant: return "Your question seems out of context or not related to daily wellness. Please ask a wellness-related question." # 2) Proceed with retrieval from the knowledge base retrieved = retriever.retrieve(query) # 3) Check the cache cached_answer = get_cached_answer(query) # 4) If no retrieved data from the knowledge base if not retrieved: # If we do have a cached answer, return it if cached_answer: logger.info("No relevant entries found in knowledge base. Returning cached answer.") return cached_answer # Otherwise, no KB results and no cache => no answer return "I'm sorry, I couldn't find an answer to your question." # 5) We have retrieved data; let's check for similarity threshold top_score = retrieved[0][1] # Assuming the list is sorted descending similarity_threshold = 0.3 # Adjust this threshold based on empirical results if top_score < similarity_threshold: # (Low similarity) Perform web search using manager_agent logger.info("Similarity score below threshold. Performing web search.") web_search_response = manager_agent.run(query) logger.debug(f"Web search response: {web_search_response}") # Combine any cached answer (if it exists) with the web result if cached_answer: blend_prompt = ( f"Combine the following previous answer with the new web results to create a more creative and accurate response. " f"Do not include any of the previous prompt or instructions in your response. " f"Add positivity and conclude with a short inspirational note.\n\n" f"Previous Answer:\n{cached_answer}\n\n" f"Web Results:\n{web_search_response}" ) final_answer = llm._call(blend_prompt).strip() else: # If no cache, just return the web response final_answer = ( f"**Daily Wellness AI**\n\n" f"{web_search_response}\n\n" "Disclaimer: This information is retrieved from the web and is not a substitute for professional medical advice.\n\n" "Wishing you a calm and wonderful day!" ) # Store in cache store_in_cache(query, final_answer) return final_answer # 6) If similarity is sufficient, we will finalize an answer from the knowledge base responses = [ans for ans, score in retrieved] # 6a) If we have a cached answer, let's blend it with the new knowledge base data if cached_answer: blend_prompt = ( f"Combine the previous answer with the newly retrieved answers to enhance creativity and accuracy. " f"Do not include any of the previous prompt or instructions in your response. " f"Add new insights, creativity, and conclude with a short inspirational note.\n\n" f"Previous Answer:\n{cached_answer}\n\n" f"New Retrieved Answers:\n" + "\n".join(f"- {r}" for r in responses) ) final_answer = llm._call(blend_prompt).strip() else: # 6b) No cache => proceed with normal expansions final_answer = answer_expander.expand(query, responses, detail=detail) # 7) Store new or blended answer in cache store_in_cache(query, final_answer) return final_answer except Exception as e: logger.error(f"Error handling query: {e}") logger.debug("Exception details:", exc_info=True) return "An error occurred while processing your request." ############################################################################### # 11) Gradio Interface ############################################################################### def gradio_interface(query: str, detail: bool): """ Gradio interface function that optionally takes a detail parameter for longer responses. """ try: response = handle_query(query, detail=detail) formatted_response = response # Response is already formatted return formatted_response except Exception as e: logger.error(f"Error in Gradio interface: {e}") logger.debug("Exception details:", exc_info=True) return "**An error occurred while processing your request. Please try again later.**" # ADDED: We now have a checkbox for detail in the Gradio UI interface = gr.Interface( fn=gradio_interface, inputs=[ gr.Textbox( lines=2, placeholder="e.g., What is box breathing?", label="Ask Daily Wellness AI" ), gr.Checkbox( label="In-Depth Answer?", value=False, info="Check for a longer, more detailed response." ) ], outputs=gr.Markdown(label="Answer from Daily Wellness AI"), title="Daily Wellness AI", description="Ask wellness-related questions and receive synthesized, creative answers. Optionally request a more in-depth response.", theme="default", examples=[ ["What is box breathing and how does it help reduce anxiety?", True], ["Provide a daily wellness schedule incorporating box breathing techniques.", False], ["What are some tips for maintaining good posture while working at a desk?", True], ["Who is the CEO of Hugging Face?", False] # Example of an out-of-context question ], allow_flagging="never" ) ############################################################################### # 12) Launch Gradio ############################################################################### if __name__ == "__main__": try: # For Hugging Face Spaces, set share=True to create a public link interface.launch(server_name="0.0.0.0", server_port=7860, debug=False, share=True) except Exception as e: logger.error(f"Failed to launch Gradio interface: {e}") logger.debug("Exception details:", exc_info=True)