chatbot / app.py
Phoenix21's picture
corrected import error
c39d038 verified
# app.py
import os
import pandas as pd
import chardet
import logging
import gradio as gr
import json
import hashlib
import numpy as np # ADDED for easy array handling
from typing import Optional, List, Tuple, ClassVar, Dict
from sentence_transformers import SentenceTransformer, util, CrossEncoder
from langchain.llms.base import LLM
import google.generativeai as genai
# Import smolagents components
from smolagents import CodeAgent, LiteLLMModel, DuckDuckGoSearchTool, ManagedAgent, HfApiModel
###############################################################################
# 1) Logging Setup
###############################################################################
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Daily Wellness AI")
###############################################################################
# 2) API Key Handling and Enhanced GeminiLLM Class
###############################################################################
def clean_api_key(key: str) -> str:
"""Remove non-ASCII characters and strip whitespace from the API key."""
return ''.join(c for c in key if ord(c) < 128).strip()
# Load the GEMINI API key from environment variables
gemini_api_key = os.environ.get("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY environment variable not set.")
raise EnvironmentError("Please set the GEMINI_API_KEY environment variable.")
gemini_api_key = clean_api_key(gemini_api_key)
logger.info("GEMINI API Key loaded successfully.")
# Configure Google Generative AI
try:
genai.configure(api_key=gemini_api_key)
logger.info("Configured Google Generative AI with provided API key.")
except Exception as e:
logger.error(f"Failed to configure Google Generative AI: {e}")
raise e
class GeminiLLM(LLM):
model_name: ClassVar[str] = "gemini-2.0-flash-exp"
temperature: float = 0.7
top_p: float = 0.95
top_k: int = 40
max_tokens: int = 2048
@property
def _llm_type(self) -> str:
return "custom_gemini"
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
generation_config = {
"temperature": self.temperature,
"top_p": self.top_p,
"top_k": self.top_k,
"max_output_tokens": self.max_tokens,
}
try:
logger.debug(f"Initializing GenerativeModel with config: {generation_config}")
model = genai.GenerativeModel(
model_name=self.model_name,
generation_config=generation_config,
)
logger.debug("GenerativeModel initialized successfully.")
chat_session = model.start_chat(history=[])
logger.debug("Chat session started.")
# Send the prompt as plain text
response = chat_session.send_message(prompt)
logger.debug(f"Prompt sent to model: {prompt}")
logger.debug(f"Raw response received: {response.text}")
return response.text
except Exception as e:
logger.error(f"Error generating response with GeminiLLM: {e}")
logger.debug("Exception details:", exc_info=True)
raise e
# Instantiate the GeminiLLM globally
llm = GeminiLLM()
###############################################################################
# 3) CSV Loading and Processing
###############################################################################
def load_csv(file_path: str):
try:
if not os.path.isfile(file_path):
logger.error(f"CSV file does not exist: {file_path}")
return [], []
with open(file_path, 'rb') as f:
result = chardet.detect(f.read())
encoding = result['encoding']
data = pd.read_csv(file_path, encoding=encoding)
if 'Question' not in data.columns or 'Answers' not in data.columns:
raise ValueError("CSV must contain 'Question' and 'Answers' columns.")
data = data.dropna(subset=['Question', 'Answers'])
logger.info(f"Loaded {len(data)} entries from {file_path}")
return data['Question'].tolist(), data['Answers'].tolist()
except Exception as e:
logger.error(f"Error loading CSV: {e}")
return [], []
# Path to your CSV file (ensure 'AIChatbot.csv' is in the repository)
csv_file_path = "AIChatbot.csv"
corpus_questions, corpus_answers = load_csv(csv_file_path)
if not corpus_questions:
raise ValueError("Failed to load the knowledge base.")
###############################################################################
# 4) Sentence Embeddings & Cross-Encoder
###############################################################################
embedding_model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1"
try:
embedding_model = SentenceTransformer(embedding_model_name)
logger.info(f"Loaded embedding model: {embedding_model_name}")
except Exception as e:
logger.error(f"Failed to load embedding model: {e}")
raise e
try:
question_embeddings = embedding_model.encode(corpus_questions, convert_to_tensor=True)
logger.info("Encoded question embeddings successfully.")
except Exception as e:
logger.error(f"Failed to encode question embeddings: {e}")
raise e
cross_encoder_name = "cross-encoder/ms-marco-MiniLM-L-6-v2"
try:
cross_encoder = CrossEncoder(cross_encoder_name)
logger.info(f"Loaded cross-encoder model: {cross_encoder_name}")
except Exception as e:
logger.error(f"Failed to load cross-encoder model: {e}")
raise e
###############################################################################
# 5) Retrieval + Re-Ranking
###############################################################################
class EmbeddingRetriever:
def __init__(self, questions, answers, embeddings, model, cross_encoder):
self.questions = questions
self.answers = answers
self.embeddings = embeddings
self.model = model
self.cross_encoder = cross_encoder
def retrieve(self, query: str, top_k: int = 3):
try:
query_embedding = self.model.encode(query, convert_to_tensor=True)
scores = util.pytorch_cos_sim(query_embedding, self.embeddings)[0].cpu().tolist()
scored_data = sorted(zip(self.questions, self.answers, scores), key=lambda x: x[2], reverse=True)[:top_k]
cross_inputs = [[query, candidate[0]] for candidate in scored_data]
cross_scores = self.cross_encoder.predict(cross_inputs)
reranked = sorted(zip(scored_data, cross_scores), key=lambda x: x[1], reverse=True)
final_retrieved = [(entry[0][1], entry[1]) for entry in reranked]
logger.debug(f"Retrieved and reranked answers: {final_retrieved}")
return final_retrieved
except Exception as e:
logger.error(f"Error during retrieval: {e}")
logger.debug("Exception details:", exc_info=True)
return []
retriever = EmbeddingRetriever(corpus_questions, corpus_answers, question_embeddings, embedding_model, cross_encoder)
###############################################################################
# 6) Sanity Check Tool
###############################################################################
class QuestionSanityChecker:
def __init__(self, llm: GeminiLLM):
self.llm = llm
def is_relevant(self, question: str) -> bool:
prompt = (
f"You are an assistant that determines whether a question is relevant to daily wellness.\n\n"
f"Question: {question}\n\n"
f"Is the above question relevant to daily wellness? Respond with 'Yes' or 'No' only."
)
try:
response = self.llm._call(prompt)
is_yes = 'yes' in response.lower()
is_no = 'no' in response.lower()
logger.debug(f"Sanity check response: '{response}', interpreted as is_yes={is_yes}, is_no={is_no}")
if is_yes and not is_no:
return True
elif is_no and not is_yes:
return False
else:
# Ambiguous response
logger.warning(f"Sanity check ambiguous response: '{response}'. Defaulting to 'No'.")
return False
except Exception as e:
logger.error(f"Error in sanity check: {e}")
logger.debug("Exception details:", exc_info=True)
return False
# Instantiate the sanity checker globally
sanity_checker = QuestionSanityChecker(llm)
###############################################################################
# 7) smolagents Integration: GROQ Model and Web Search
###############################################################################
# Initialize the smolagents' LiteLLMModel with GROQ model
smol_model = HfApiModel()
# Instantiate the DuckDuckGo search tool
search_tool = DuckDuckGoSearchTool()
# Create the web agent with the search tool
web_agent = CodeAgent(
tools=[search_tool],
model=smol_model
)
# Define the managed web agent
managed_web_agent = ManagedAgent(
agent=web_agent,
name="web_search",
description="Runs a web search for you. Provide your query as an argument."
)
# Create the manager agent with managed web agent and additional tools if needed
manager_agent = CodeAgent(
tools=[], # Add additional tools here if required
model=smol_model,
managed_agents=[managed_web_agent]
)
###############################################################################
# 8) Answer Expansion
###############################################################################
class AnswerExpander:
def __init__(self, llm: GeminiLLM):
self.llm = llm
def expand(self, query: str, retrieved_answers: List[str], detail: bool = False) -> str:
"""
Synthesize answers into a single cohesive response.
If detail=True, provide a more detailed response.
"""
try:
reference_block = "\n".join(
f"- {idx+1}) {ans}" for idx, ans in enumerate(retrieved_answers, start=1)
)
# ADDED: More elaboration if detail=True
detail_instructions = (
"Provide a thorough, in-depth explanation, adding relevant tips and context, "
"while remaining creative and brand-aligned. "
if detail else
"Provide a concise response in no more than 4 sentences."
)
prompt = (
f"You are Daily Wellness AI, a friendly wellness expert. Below are multiple "
f"potential answers retrieved from a local knowledge base. You have a user question.\n\n"
f"Question: {query}\n\n"
f"Retrieved Answers:\n{reference_block}\n\n"
f"Please synthesize these references into a single cohesive, creative, and brand-aligned response. "
f"{detail_instructions} "
f"End with a short inspirational note.\n\n"
"Disclaimer: This is general wellness information, not a substitute for professional medical advice."
)
logger.debug(f"Generated prompt for answer expansion: {prompt}")
response = self.llm._call(prompt)
logger.debug(f"Expanded answer: {response}")
return response.strip()
except Exception as e:
logger.error(f"Error expanding answer: {e}")
logger.debug("Exception details:", exc_info=True)
return "Sorry, an error occurred while generating a response."
answer_expander = AnswerExpander(llm)
###############################################################################
# 9) Persistent Cache (ADDED)
###############################################################################
CACHE_FILE = "query_cache.json"
SIMILARITY_THRESHOLD_CACHE = 0.8 # Adjust for how close a query must be to reuse cache
def load_cache() -> Dict:
"""Load the cache from the local JSON file."""
if os.path.isfile(CACHE_FILE):
try:
with open(CACHE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load cache file: {e}")
return {}
return {}
def save_cache(cache_data: Dict):
"""Save the cache dictionary to a local JSON file."""
try:
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cache_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Failed to save cache file: {e}")
def compute_hash(text: str) -> str:
"""Compute a simple hash for the text to handle duplicates in a consistent way."""
return hashlib.md5(text.encode("utf-8")).hexdigest()
# ADDED: Load cache at startup
cache_store = load_cache()
###############################################################################
# 9.1) Utility to attempt cached retrieval (ADDED)
###############################################################################
def get_cached_answer(query: str) -> Optional[str]:
"""
Returns a cached answer if there's a very similar query in the cache.
We'll compare embeddings to find if a stored query is above threshold.
"""
if not cache_store:
return None
# Compute embedding for the incoming query
query_embedding = embedding_model.encode(query, convert_to_tensor=True)
# Check all cached items
best_score = 0.0
best_answer = None
for cached_q, cache_data in cache_store.items():
stored_embedding = np.array(cache_data["embedding"], dtype=np.float32)
score = util.pytorch_cos_sim(query_embedding, stored_embedding)[0].item()
if score > best_score:
best_score = score
best_answer = cache_data["answer"]
if best_score >= SIMILARITY_THRESHOLD_CACHE:
logger.info(f"Cache hit! Similarity: {best_score:.2f}, returning cached answer.")
return best_answer
return None
def store_in_cache(query: str, answer: str):
"""
Store a query-answer pair in the cache with the query's embedding.
"""
query_embedding = embedding_model.encode(query, convert_to_tensor=True).cpu().tolist()
cache_key = compute_hash(query)
cache_store[cache_key] = {
"query": query,
"answer": answer,
"embedding": query_embedding
}
save_cache(cache_store)
###############################################################################
# 10) Query Handling
###############################################################################
def handle_query(query: str, detail: bool = False) -> str:
"""
Main function to process the query.
:param query: The user's question.
:param detail: Whether the user wants a more detailed response.
:return: Response string from Daily Wellness AI.
"""
if not query or not isinstance(query, str) or len(query.strip()) == 0:
return "Please provide a valid question."
try:
# 1) Sanity Check: Determine if the question is relevant to daily wellness
is_relevant = sanity_checker.is_relevant(query)
if not is_relevant:
return "Your question seems out of context or not related to daily wellness. Please ask a wellness-related question."
# 2) Proceed with retrieval from the knowledge base
retrieved = retriever.retrieve(query)
# 3) Check the cache
cached_answer = get_cached_answer(query)
# 4) If no retrieved data from the knowledge base
if not retrieved:
# If we do have a cached answer, return it
if cached_answer:
logger.info("No relevant entries found in knowledge base. Returning cached answer.")
return cached_answer
# Otherwise, no KB results and no cache => no answer
return "I'm sorry, I couldn't find an answer to your question."
# 5) We have retrieved data; let's check for similarity threshold
top_score = retrieved[0][1] # Assuming the list is sorted descending
similarity_threshold = 0.3 # Adjust this threshold based on empirical results
if top_score < similarity_threshold:
# (Low similarity) Perform web search using manager_agent
logger.info("Similarity score below threshold. Performing web search.")
web_search_response = manager_agent.run(query)
logger.debug(f"Web search response: {web_search_response}")
# Combine any cached answer (if it exists) with the web result
if cached_answer:
blend_prompt = (
f"Combine the following previous answer with the new web results to create a more creative and accurate response. "
f"Do not include any of the previous prompt or instructions in your response. "
f"Add positivity and conclude with a short inspirational note.\n\n"
f"Previous Answer:\n{cached_answer}\n\n"
f"Web Results:\n{web_search_response}"
)
final_answer = llm._call(blend_prompt).strip()
else:
# If no cache, just return the web response
final_answer = (
f"**Daily Wellness AI**\n\n"
f"{web_search_response}\n\n"
"Disclaimer: This information is retrieved from the web and is not a substitute for professional medical advice.\n\n"
"Wishing you a calm and wonderful day!"
)
# Store in cache
store_in_cache(query, final_answer)
return final_answer
# 6) If similarity is sufficient, we will finalize an answer from the knowledge base
responses = [ans for ans, score in retrieved]
# 6a) If we have a cached answer, let's blend it with the new knowledge base data
if cached_answer:
blend_prompt = (
f"Combine the previous answer with the newly retrieved answers to enhance creativity and accuracy. "
f"Do not include any of the previous prompt or instructions in your response. "
f"Add new insights, creativity, and conclude with a short inspirational note.\n\n"
f"Previous Answer:\n{cached_answer}\n\n"
f"New Retrieved Answers:\n" + "\n".join(f"- {r}" for r in responses)
)
final_answer = llm._call(blend_prompt).strip()
else:
# 6b) No cache => proceed with normal expansions
final_answer = answer_expander.expand(query, responses, detail=detail)
# 7) Store new or blended answer in cache
store_in_cache(query, final_answer)
return final_answer
except Exception as e:
logger.error(f"Error handling query: {e}")
logger.debug("Exception details:", exc_info=True)
return "An error occurred while processing your request."
###############################################################################
# 11) Gradio Interface
###############################################################################
def gradio_interface(query: str, detail: bool):
"""
Gradio interface function that optionally takes a detail parameter for longer responses.
"""
try:
response = handle_query(query, detail=detail)
formatted_response = response # Response is already formatted
return formatted_response
except Exception as e:
logger.error(f"Error in Gradio interface: {e}")
logger.debug("Exception details:", exc_info=True)
return "**An error occurred while processing your request. Please try again later.**"
# ADDED: We now have a checkbox for detail in the Gradio UI
interface = gr.Interface(
fn=gradio_interface,
inputs=[
gr.Textbox(
lines=2,
placeholder="e.g., What is box breathing?",
label="Ask Daily Wellness AI"
),
gr.Checkbox(
label="In-Depth Answer?",
value=False,
info="Check for a longer, more detailed response."
)
],
outputs=gr.Markdown(label="Answer from Daily Wellness AI"),
title="Daily Wellness AI",
description="Ask wellness-related questions and receive synthesized, creative answers. Optionally request a more in-depth response.",
theme="default",
examples=[
["What is box breathing and how does it help reduce anxiety?", True],
["Provide a daily wellness schedule incorporating box breathing techniques.", False],
["What are some tips for maintaining good posture while working at a desk?", True],
["Who is the CEO of Hugging Face?", False] # Example of an out-of-context question
],
allow_flagging="never"
)
###############################################################################
# 12) Launch Gradio
###############################################################################
if __name__ == "__main__":
try:
# For Hugging Face Spaces, set share=True to create a public link
interface.launch(server_name="0.0.0.0", server_port=7860, debug=False, share=True)
except Exception as e:
logger.error(f"Failed to launch Gradio interface: {e}")
logger.debug("Exception details:", exc_info=True)