|
|
|
|
|
import os |
|
import pandas as pd |
|
import chardet |
|
import logging |
|
import gradio as gr |
|
import json |
|
import hashlib |
|
import numpy as np |
|
from typing import Optional, List, Tuple, ClassVar, Dict |
|
|
|
from sentence_transformers import SentenceTransformer, util, CrossEncoder |
|
from langchain.llms.base import LLM |
|
import google.generativeai as genai |
|
|
|
|
|
from smolagents import CodeAgent, LiteLLMModel, DuckDuckGoSearchTool, ManagedAgent, HfApiModel |
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger("Daily Wellness AI") |
|
|
|
|
|
|
|
|
|
def clean_api_key(key: str) -> str: |
|
"""Remove non-ASCII characters and strip whitespace from the API key.""" |
|
return ''.join(c for c in key if ord(c) < 128).strip() |
|
|
|
|
|
gemini_api_key = os.environ.get("GEMINI_API_KEY") |
|
|
|
if not gemini_api_key: |
|
logger.error("GEMINI_API_KEY environment variable not set.") |
|
raise EnvironmentError("Please set the GEMINI_API_KEY environment variable.") |
|
|
|
gemini_api_key = clean_api_key(gemini_api_key) |
|
logger.info("GEMINI API Key loaded successfully.") |
|
|
|
|
|
try: |
|
genai.configure(api_key=gemini_api_key) |
|
logger.info("Configured Google Generative AI with provided API key.") |
|
except Exception as e: |
|
logger.error(f"Failed to configure Google Generative AI: {e}") |
|
raise e |
|
|
|
class GeminiLLM(LLM): |
|
model_name: ClassVar[str] = "gemini-2.0-flash-exp" |
|
temperature: float = 0.7 |
|
top_p: float = 0.95 |
|
top_k: int = 40 |
|
max_tokens: int = 2048 |
|
|
|
@property |
|
def _llm_type(self) -> str: |
|
return "custom_gemini" |
|
|
|
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: |
|
generation_config = { |
|
"temperature": self.temperature, |
|
"top_p": self.top_p, |
|
"top_k": self.top_k, |
|
"max_output_tokens": self.max_tokens, |
|
} |
|
|
|
try: |
|
logger.debug(f"Initializing GenerativeModel with config: {generation_config}") |
|
model = genai.GenerativeModel( |
|
model_name=self.model_name, |
|
generation_config=generation_config, |
|
) |
|
logger.debug("GenerativeModel initialized successfully.") |
|
|
|
chat_session = model.start_chat(history=[]) |
|
logger.debug("Chat session started.") |
|
|
|
|
|
response = chat_session.send_message(prompt) |
|
logger.debug(f"Prompt sent to model: {prompt}") |
|
logger.debug(f"Raw response received: {response.text}") |
|
|
|
return response.text |
|
except Exception as e: |
|
logger.error(f"Error generating response with GeminiLLM: {e}") |
|
logger.debug("Exception details:", exc_info=True) |
|
raise e |
|
|
|
|
|
llm = GeminiLLM() |
|
|
|
|
|
|
|
|
|
def load_csv(file_path: str): |
|
try: |
|
if not os.path.isfile(file_path): |
|
logger.error(f"CSV file does not exist: {file_path}") |
|
return [], [] |
|
|
|
with open(file_path, 'rb') as f: |
|
result = chardet.detect(f.read()) |
|
encoding = result['encoding'] |
|
|
|
data = pd.read_csv(file_path, encoding=encoding) |
|
if 'Question' not in data.columns or 'Answers' not in data.columns: |
|
raise ValueError("CSV must contain 'Question' and 'Answers' columns.") |
|
data = data.dropna(subset=['Question', 'Answers']) |
|
|
|
logger.info(f"Loaded {len(data)} entries from {file_path}") |
|
return data['Question'].tolist(), data['Answers'].tolist() |
|
except Exception as e: |
|
logger.error(f"Error loading CSV: {e}") |
|
return [], [] |
|
|
|
|
|
csv_file_path = "AIChatbot.csv" |
|
corpus_questions, corpus_answers = load_csv(csv_file_path) |
|
if not corpus_questions: |
|
raise ValueError("Failed to load the knowledge base.") |
|
|
|
|
|
|
|
|
|
embedding_model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1" |
|
try: |
|
embedding_model = SentenceTransformer(embedding_model_name) |
|
logger.info(f"Loaded embedding model: {embedding_model_name}") |
|
except Exception as e: |
|
logger.error(f"Failed to load embedding model: {e}") |
|
raise e |
|
|
|
try: |
|
question_embeddings = embedding_model.encode(corpus_questions, convert_to_tensor=True) |
|
logger.info("Encoded question embeddings successfully.") |
|
except Exception as e: |
|
logger.error(f"Failed to encode question embeddings: {e}") |
|
raise e |
|
|
|
cross_encoder_name = "cross-encoder/ms-marco-MiniLM-L-6-v2" |
|
try: |
|
cross_encoder = CrossEncoder(cross_encoder_name) |
|
logger.info(f"Loaded cross-encoder model: {cross_encoder_name}") |
|
except Exception as e: |
|
logger.error(f"Failed to load cross-encoder model: {e}") |
|
raise e |
|
|
|
|
|
|
|
|
|
class EmbeddingRetriever: |
|
def __init__(self, questions, answers, embeddings, model, cross_encoder): |
|
self.questions = questions |
|
self.answers = answers |
|
self.embeddings = embeddings |
|
self.model = model |
|
self.cross_encoder = cross_encoder |
|
|
|
def retrieve(self, query: str, top_k: int = 3): |
|
try: |
|
query_embedding = self.model.encode(query, convert_to_tensor=True) |
|
scores = util.pytorch_cos_sim(query_embedding, self.embeddings)[0].cpu().tolist() |
|
scored_data = sorted(zip(self.questions, self.answers, scores), key=lambda x: x[2], reverse=True)[:top_k] |
|
|
|
cross_inputs = [[query, candidate[0]] for candidate in scored_data] |
|
cross_scores = self.cross_encoder.predict(cross_inputs) |
|
|
|
reranked = sorted(zip(scored_data, cross_scores), key=lambda x: x[1], reverse=True) |
|
final_retrieved = [(entry[0][1], entry[1]) for entry in reranked] |
|
logger.debug(f"Retrieved and reranked answers: {final_retrieved}") |
|
return final_retrieved |
|
except Exception as e: |
|
logger.error(f"Error during retrieval: {e}") |
|
logger.debug("Exception details:", exc_info=True) |
|
return [] |
|
|
|
retriever = EmbeddingRetriever(corpus_questions, corpus_answers, question_embeddings, embedding_model, cross_encoder) |
|
|
|
|
|
|
|
|
|
class QuestionSanityChecker: |
|
def __init__(self, llm: GeminiLLM): |
|
self.llm = llm |
|
|
|
def is_relevant(self, question: str) -> bool: |
|
prompt = ( |
|
f"You are an assistant that determines whether a question is relevant to daily wellness.\n\n" |
|
f"Question: {question}\n\n" |
|
f"Is the above question relevant to daily wellness? Respond with 'Yes' or 'No' only." |
|
) |
|
try: |
|
response = self.llm._call(prompt) |
|
is_yes = 'yes' in response.lower() |
|
is_no = 'no' in response.lower() |
|
logger.debug(f"Sanity check response: '{response}', interpreted as is_yes={is_yes}, is_no={is_no}") |
|
if is_yes and not is_no: |
|
return True |
|
elif is_no and not is_yes: |
|
return False |
|
else: |
|
|
|
logger.warning(f"Sanity check ambiguous response: '{response}'. Defaulting to 'No'.") |
|
return False |
|
except Exception as e: |
|
logger.error(f"Error in sanity check: {e}") |
|
logger.debug("Exception details:", exc_info=True) |
|
return False |
|
|
|
|
|
sanity_checker = QuestionSanityChecker(llm) |
|
|
|
|
|
|
|
|
|
|
|
smol_model = HfApiModel() |
|
|
|
|
|
search_tool = DuckDuckGoSearchTool() |
|
|
|
|
|
web_agent = CodeAgent( |
|
tools=[search_tool], |
|
model=smol_model |
|
) |
|
|
|
|
|
managed_web_agent = ManagedAgent( |
|
agent=web_agent, |
|
name="web_search", |
|
description="Runs a web search for you. Provide your query as an argument." |
|
) |
|
|
|
|
|
manager_agent = CodeAgent( |
|
tools=[], |
|
model=smol_model, |
|
managed_agents=[managed_web_agent] |
|
) |
|
|
|
|
|
|
|
|
|
class AnswerExpander: |
|
def __init__(self, llm: GeminiLLM): |
|
self.llm = llm |
|
|
|
def expand(self, query: str, retrieved_answers: List[str], detail: bool = False) -> str: |
|
""" |
|
Synthesize answers into a single cohesive response. |
|
If detail=True, provide a more detailed response. |
|
""" |
|
try: |
|
reference_block = "\n".join( |
|
f"- {idx+1}) {ans}" for idx, ans in enumerate(retrieved_answers, start=1) |
|
) |
|
|
|
|
|
detail_instructions = ( |
|
"Provide a thorough, in-depth explanation, adding relevant tips and context, " |
|
"while remaining creative and brand-aligned. " |
|
if detail else |
|
"Provide a concise response in no more than 4 sentences." |
|
) |
|
|
|
prompt = ( |
|
f"You are Daily Wellness AI, a friendly wellness expert. Below are multiple " |
|
f"potential answers retrieved from a local knowledge base. You have a user question.\n\n" |
|
f"Question: {query}\n\n" |
|
f"Retrieved Answers:\n{reference_block}\n\n" |
|
f"Please synthesize these references into a single cohesive, creative, and brand-aligned response. " |
|
f"{detail_instructions} " |
|
f"End with a short inspirational note.\n\n" |
|
"Disclaimer: This is general wellness information, not a substitute for professional medical advice." |
|
) |
|
|
|
logger.debug(f"Generated prompt for answer expansion: {prompt}") |
|
response = self.llm._call(prompt) |
|
logger.debug(f"Expanded answer: {response}") |
|
return response.strip() |
|
except Exception as e: |
|
logger.error(f"Error expanding answer: {e}") |
|
logger.debug("Exception details:", exc_info=True) |
|
return "Sorry, an error occurred while generating a response." |
|
|
|
answer_expander = AnswerExpander(llm) |
|
|
|
|
|
|
|
|
|
CACHE_FILE = "query_cache.json" |
|
SIMILARITY_THRESHOLD_CACHE = 0.8 |
|
|
|
def load_cache() -> Dict: |
|
"""Load the cache from the local JSON file.""" |
|
if os.path.isfile(CACHE_FILE): |
|
try: |
|
with open(CACHE_FILE, "r", encoding="utf-8") as f: |
|
return json.load(f) |
|
except Exception as e: |
|
logger.error(f"Failed to load cache file: {e}") |
|
return {} |
|
return {} |
|
|
|
def save_cache(cache_data: Dict): |
|
"""Save the cache dictionary to a local JSON file.""" |
|
try: |
|
with open(CACHE_FILE, "w", encoding="utf-8") as f: |
|
json.dump(cache_data, f, ensure_ascii=False, indent=2) |
|
except Exception as e: |
|
logger.error(f"Failed to save cache file: {e}") |
|
|
|
def compute_hash(text: str) -> str: |
|
"""Compute a simple hash for the text to handle duplicates in a consistent way.""" |
|
return hashlib.md5(text.encode("utf-8")).hexdigest() |
|
|
|
|
|
cache_store = load_cache() |
|
|
|
|
|
|
|
|
|
def get_cached_answer(query: str) -> Optional[str]: |
|
""" |
|
Returns a cached answer if there's a very similar query in the cache. |
|
We'll compare embeddings to find if a stored query is above threshold. |
|
""" |
|
if not cache_store: |
|
return None |
|
|
|
|
|
query_embedding = embedding_model.encode(query, convert_to_tensor=True) |
|
|
|
|
|
best_score = 0.0 |
|
best_answer = None |
|
|
|
for cached_q, cache_data in cache_store.items(): |
|
stored_embedding = np.array(cache_data["embedding"], dtype=np.float32) |
|
score = util.pytorch_cos_sim(query_embedding, stored_embedding)[0].item() |
|
if score > best_score: |
|
best_score = score |
|
best_answer = cache_data["answer"] |
|
|
|
if best_score >= SIMILARITY_THRESHOLD_CACHE: |
|
logger.info(f"Cache hit! Similarity: {best_score:.2f}, returning cached answer.") |
|
return best_answer |
|
return None |
|
|
|
def store_in_cache(query: str, answer: str): |
|
""" |
|
Store a query-answer pair in the cache with the query's embedding. |
|
""" |
|
query_embedding = embedding_model.encode(query, convert_to_tensor=True).cpu().tolist() |
|
cache_key = compute_hash(query) |
|
cache_store[cache_key] = { |
|
"query": query, |
|
"answer": answer, |
|
"embedding": query_embedding |
|
} |
|
save_cache(cache_store) |
|
|
|
|
|
|
|
|
|
def handle_query(query: str, detail: bool = False) -> str: |
|
""" |
|
Main function to process the query. |
|
:param query: The user's question. |
|
:param detail: Whether the user wants a more detailed response. |
|
:return: Response string from Daily Wellness AI. |
|
""" |
|
if not query or not isinstance(query, str) or len(query.strip()) == 0: |
|
return "Please provide a valid question." |
|
|
|
try: |
|
|
|
is_relevant = sanity_checker.is_relevant(query) |
|
if not is_relevant: |
|
return "Your question seems out of context or not related to daily wellness. Please ask a wellness-related question." |
|
|
|
|
|
retrieved = retriever.retrieve(query) |
|
|
|
|
|
cached_answer = get_cached_answer(query) |
|
|
|
|
|
if not retrieved: |
|
|
|
if cached_answer: |
|
logger.info("No relevant entries found in knowledge base. Returning cached answer.") |
|
return cached_answer |
|
|
|
return "I'm sorry, I couldn't find an answer to your question." |
|
|
|
|
|
top_score = retrieved[0][1] |
|
similarity_threshold = 0.3 |
|
|
|
if top_score < similarity_threshold: |
|
|
|
logger.info("Similarity score below threshold. Performing web search.") |
|
web_search_response = manager_agent.run(query) |
|
logger.debug(f"Web search response: {web_search_response}") |
|
|
|
|
|
if cached_answer: |
|
blend_prompt = ( |
|
f"Combine the following previous answer with the new web results to create a more creative and accurate response. " |
|
f"Do not include any of the previous prompt or instructions in your response. " |
|
f"Add positivity and conclude with a short inspirational note.\n\n" |
|
f"Previous Answer:\n{cached_answer}\n\n" |
|
f"Web Results:\n{web_search_response}" |
|
) |
|
final_answer = llm._call(blend_prompt).strip() |
|
else: |
|
|
|
final_answer = ( |
|
f"**Daily Wellness AI**\n\n" |
|
f"{web_search_response}\n\n" |
|
"Disclaimer: This information is retrieved from the web and is not a substitute for professional medical advice.\n\n" |
|
"Wishing you a calm and wonderful day!" |
|
) |
|
|
|
|
|
store_in_cache(query, final_answer) |
|
return final_answer |
|
|
|
|
|
responses = [ans for ans, score in retrieved] |
|
|
|
|
|
if cached_answer: |
|
blend_prompt = ( |
|
f"Combine the previous answer with the newly retrieved answers to enhance creativity and accuracy. " |
|
f"Do not include any of the previous prompt or instructions in your response. " |
|
f"Add new insights, creativity, and conclude with a short inspirational note.\n\n" |
|
f"Previous Answer:\n{cached_answer}\n\n" |
|
f"New Retrieved Answers:\n" + "\n".join(f"- {r}" for r in responses) |
|
) |
|
final_answer = llm._call(blend_prompt).strip() |
|
else: |
|
|
|
final_answer = answer_expander.expand(query, responses, detail=detail) |
|
|
|
|
|
store_in_cache(query, final_answer) |
|
return final_answer |
|
|
|
except Exception as e: |
|
logger.error(f"Error handling query: {e}") |
|
logger.debug("Exception details:", exc_info=True) |
|
return "An error occurred while processing your request." |
|
|
|
|
|
|
|
|
|
|
|
def gradio_interface(query: str, detail: bool): |
|
""" |
|
Gradio interface function that optionally takes a detail parameter for longer responses. |
|
""" |
|
try: |
|
response = handle_query(query, detail=detail) |
|
formatted_response = response |
|
return formatted_response |
|
except Exception as e: |
|
logger.error(f"Error in Gradio interface: {e}") |
|
logger.debug("Exception details:", exc_info=True) |
|
return "**An error occurred while processing your request. Please try again later.**" |
|
|
|
|
|
interface = gr.Interface( |
|
fn=gradio_interface, |
|
inputs=[ |
|
gr.Textbox( |
|
lines=2, |
|
placeholder="e.g., What is box breathing?", |
|
label="Ask Daily Wellness AI" |
|
), |
|
gr.Checkbox( |
|
label="In-Depth Answer?", |
|
value=False, |
|
info="Check for a longer, more detailed response." |
|
) |
|
], |
|
outputs=gr.Markdown(label="Answer from Daily Wellness AI"), |
|
title="Daily Wellness AI", |
|
description="Ask wellness-related questions and receive synthesized, creative answers. Optionally request a more in-depth response.", |
|
theme="default", |
|
examples=[ |
|
["What is box breathing and how does it help reduce anxiety?", True], |
|
["Provide a daily wellness schedule incorporating box breathing techniques.", False], |
|
["What are some tips for maintaining good posture while working at a desk?", True], |
|
["Who is the CEO of Hugging Face?", False] |
|
], |
|
allow_flagging="never" |
|
) |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
|
|
interface.launch(server_name="0.0.0.0", server_port=7860, debug=False, share=True) |
|
except Exception as e: |
|
logger.error(f"Failed to launch Gradio interface: {e}") |
|
logger.debug("Exception details:", exc_info=True) |
|
|