import gradio as gr import pinecone import requests import PyPDF2 from transformers import AutoTokenizer, AutoModel import torch import re import google.generativeai as genai import os import time from datetime import datetime, timedelta from google.api_core import exceptions # Constants PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") # Set in HF Spaces Secrets PINECONE_INDEX_NAME = "diabetes-bot" PINECONE_NAMESPACE = "general" GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Set in HF Spaces Secrets MODEL_NAME = "dmis-lab/biobert-base-cased-v1.1" # Free tier limits FREE_TIER_RPD_LIMIT = 1000 # Requests per day FREE_TIER_RPM_LIMIT = 15 # Requests per minute FREE_TIER_TPM_LIMIT = 1000000 # Tokens per minute WARNING_THRESHOLD = 0.9 # Stop at 90% of the limit to be safe # Usage tracking usage_file = "usage.txt" def load_usage(): if not os.path.exists(usage_file): return {"requests": [], "tokens": []} with open(usage_file, "r") as f: data = f.read().strip() if not data: return {"requests": [], "tokens": []} requests, tokens = data.split("|") return { "requests": [float(t) for t in requests.split(",") if t], "tokens": [(float(t), float(n)) for t, n in [pair.split(":") for pair in tokens.split(",") if pair]] } def save_usage(requests, tokens): with open(usage_file, "w") as f: f.write(",".join(map(str, requests)) + "|" + ",".join(f"{t}:{n}" for t, n in tokens)) def check_usage(): usage = load_usage() now = time.time() # Clean up old requests (older than 24 hours) day_ago = now - 24 * 60 * 60 usage["requests"] = [t for t in usage["requests"] if t > day_ago] # Clean up old token counts (older than 1 minute) minute_ago = now - 60 usage["tokens"] = [(t, n) for t, n in usage["tokens"] if t > minute_ago] # Count requests per day rpd = len(usage["requests"]) rpd_limit = int(FREE_TIER_RPD_LIMIT * WARNING_THRESHOLD) if rpd >= rpd_limit: return False, f"Approaching daily request limit ({rpd}/{FREE_TIER_RPD_LIMIT}). Stopping to stay in free tier. Try again tomorrow." # Count requests per minute minute_ago = now - 60 rpm = len([t for t in usage["requests"] if t > minute_ago]) rpm_limit = int(FREE_TIER_RPM_LIMIT * WARNING_THRESHOLD) if rpm >= rpm_limit: return False, f"Approaching minute request limit ({rpm}/{FREE_TIER_RPM_LIMIT}). Wait a minute and try again." # Count tokens per minute tpm = sum(n for t, n in usage["tokens"]) tpm_limit = int(FREE_TIER_TPM_LIMIT * WARNING_THRESHOLD) if tpm >= tpm_limit: return False, f"Approaching token limit ({tpm}/{FREE_TIER_TPM_LIMIT} per minute). Wait a minute and try again." return True, (rpd, rpm, tpm) # Initialize Pinecone pc = pinecone.Pinecone(api_key=PINECONE_API_KEY) index = pc.Index(PINECONE_INDEX_NAME) # Initialize BioBERT for embedding queries tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModel.from_pretrained(MODEL_NAME) if torch.cuda.is_available(): model.cuda() # Initialize Gemini and check available models genai.configure(api_key=GEMINI_API_KEY) # List available models to confirm free tier access available_models = [model.name for model in genai.list_models()] print("Available Gemini models:", available_models) preferred_model = "gemini-pro" # Use the generally available model if preferred_model in available_models: gemini_model = genai.GenerativeModel(preferred_model) print(f"Using model: {preferred_model}") else: # Try other available models (if needed) for model_name in ["gemini-2.0-flash", "gemini-1.5-pro"]: if f"models/{model_name}" in available_models: gemini_model = genai.GenerativeModel(f"models/{model_name}") print(f"Using model: models/{model_name}") break # Use the first available match else: raise ValueError("No suitable Gemini model available. Available models: " + str(available_models)) # Clean text def clean_text(text): text = re.sub(r'<[^>]+>', '', text) # Remove HTML tags text = re.sub(r'[^\x00-\x7F]+', ' ', text) # Remove non-ASCII text = re.sub(r'\s+', ' ', text) # Normalize spaces return text.strip() # Embed text using BioBERT def embed_text(text): inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512) if torch.cuda.is_available(): inputs = {k: v.cuda() for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0] return embedding.tolist() # Extract text from PDF (up to 10 pages) def extract_pdf_text(pdf_file): reader = PyPDF2.PdfReader(pdf_file) num_pages = min(len(reader.pages), 10) # Limit to 10 pages text = "" for page in range(num_pages): text += reader.pages[page].extract_text() + "\n" return clean_text(text) # Retrieve relevant chunks from Pinecone def retrieve_from_pinecone(query, top_k=5): query_embedding = embed_text(query) results = index.query( namespace=PINECONE_NAMESPACE, vector=query_embedding, top_k=top_k, include_metadata=True ) retrieved_chunks = [match["metadata"]["chunk"] for match in results["matches"]] return "\n".join(retrieved_chunks) # Count tokens using Gemini API def count_tokens(text): try: response = gemini_model.count_tokens(text) return response.total_tokens except exceptions.QuotaExceeded as e: return 0 # If quota is exceeded, return 0 to avoid counting issues # Generate answer using Gemini def generate_answer(query, context): prompt = f""" You are a diabetes research assistant. Answer the following question based on the provided context. If the context is insufficient, use your knowledge to provide a helpful answer, but note if the information might be limited. **Question**: {query} **Context**: {context} **Answer**: """ try: response = gemini_model.generate_content(prompt) return response.text except exceptions.QuotaExceeded as e: return f"Error: Gemini API quota exceeded ({str(e)}). Try again later." except Exception as e: return f"Error generating answer: {str(e)}" # Main function to handle user input def diabetes_bot(query, pdf_file=None): # Check usage limits can_proceed, usage_info = check_usage() if not can_proceed: return usage_info # Step 1: Get context from PDF if uploaded pdf_context = "" if pdf_file is not None: pdf_context = extract_pdf_text(pdf_file) if pdf_context: pdf_context = f"Uploaded PDF content:\n{pdf_context}\n\n" # Step 2: Retrieve relevant chunks from Pinecone pinecone_context = retrieve_from_pinecone(query) if pinecone_context: pinecone_context = f"Pinecone retrieved content (latest research, 2010 onward):\n{pinecone_context}\n\n" # Step 3: Combine contexts full_context = pdf_context + pinecone_context if not full_context.strip(): full_context = "No relevant context found in Pinecone or uploaded PDF." # Step 4: Count tokens for the prompt prompt = f""" You are a diabetes research assistant. Answer the following question based on the provided context. If the context is insufficient, use your knowledge to provide a helpful answer, but note if the information might be limited. **Question**: {query} **Context**: {full_context} **Answer**: """ input_tokens = count_tokens(prompt) if input_tokens == 0: # Quota exceeded during token counting return "Error: Gemini API quota exceeded while counting tokens. Try again later." # Update usage usage = load_usage() now = time.time() usage["requests"].append(now) usage["tokens"].append((now, input_tokens)) save_usage(usage["requests"], usage["tokens"]) # Step 5: Generate answer using Gemini answer = generate_answer(query, full_context) # Step 6: Count output tokens and update usage output_tokens = count_tokens(answer) if output_tokens == 0: # Quota exceeded during output token counting return answer + "\n\nError: Gemini API quota exceeded while counting output tokens. Usage stats may be incomplete." usage = load_usage() usage["tokens"].append((now, output_tokens)) save_usage(usage["requests"], usage["tokens"]) # Step 7: Show usage stats rpd, rpm, tpm = check_usage()[1] usage_message = f"\n\nUsage: {rpd}/{FREE_TIER_RPD_LIMIT} requests today, {rpm}/{FREE_TIER_RPM_LIMIT} requests this minute, {tpm}/{FREE_TIER_TPM_LIMIT} tokens this minute." return answer + usage_message # Gradio interface def chat_wrapper(query, pdf, history): # Initialize history if empty if history is None: history = [] # If no query is provided, return the current history without changes if query.strip() == "": return history, "", None, history # Call your existing diabetes_bot function to generate an answer answer = diabetes_bot(query, pdf) # Append the new interaction as a message-style tuple (role, content) history.append({"role": "user", "content": query}) history.append({"role": "assistant", "content": answer}) # Return the updated conversation and clear the query and pdf inputs return history, "", None, history def clear_all(): # Clear conversation history and inputs return [], "", None, [] with gr.Blocks() as app: gr.HTML( """

Diabetes Research ChatBot Powered By Gemini 2.0 Flash And Pinecone 🩺

Powered by the latest diabetes research, Running on Gemini 2.0 Flash API

Ask questions about diabetes directly or upload a research paper (up to 10 pages) for specific Q&A.


Disclaimer: The information provided by this chatbot is for research and informational purposes only and is not intended to substitute professional medical advice, diagnosis, or treatment. Always seek the advice of your physician or other qualified health provider with any questions you may have regarding a medical condition.

""" ) # Create a Chatbot component with type set to "messages" and a specified height chatbot = gr.Chatbot(label="Conversation", type="messages", height=370) # Input row for query and PDF file (with PDF box sized smaller) with gr.Row(): query_input = gr.Textbox(label="Ask a Question", placeholder="Type your diabetes-related query here...", lines=2) with gr.Column(scale=0.2): pdf_input = gr.File(label="Upload a PDF (optional, max 10 pages)", file_types=[".pdf"]) # Row for Submit and Clear buttons with gr.Row(): submit_button = gr.Button("Ask", variant="primary") clear_button = gr.Button("Clear") # State to maintain conversation history state = gr.State([]) # On submit, update the conversation and clear inputs; outputs: chatbot, query_input, pdf_input, state submit_button.click( fn=chat_wrapper, inputs=[query_input, pdf_input, state], outputs=[chatbot, query_input, pdf_input, state] ) # Clear all components including conversation history clear_button.click( fn=clear_all, inputs=[], outputs=[chatbot, query_input, pdf_input, state] ) app.launch()