File size: 11,808 Bytes
79cf2b9 374dbe5 79cf2b9 5ef8553 79cf2b9 0085c9f 79cf2b9 0085c9f 2e7651c 0085c9f 79cf2b9 0085c9f 79cf2b9 1f56954 fc853a4 1f56954 674726b 79cf2b9 1f56954 e811f80 5ef8553 1f56954 e811f80 1f56954 b0d1299 e811f80 b0d1299 8155db2 1f56954 79cf2b9 cb97d73 e811f80 1f56954 e811f80 cb97d73 e811f80 d97e942 1f56954 79cf2b9 1f56954 79cf2b9 1f56954 fc853a4 e811f80 1f56954 e811f80 1f56954 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
import gradio as gr
import pinecone
import requests
import PyPDF2
from transformers import AutoTokenizer, AutoModel
import torch
import re
import google.generativeai as genai
import os
import time
from datetime import datetime, timedelta
from google.api_core import exceptions
# Constants
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") # Set in HF Spaces Secrets
PINECONE_INDEX_NAME = "diabetes-bot"
PINECONE_NAMESPACE = "general"
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Set in HF Spaces Secrets
MODEL_NAME = "dmis-lab/biobert-base-cased-v1.1"
# Free tier limits
FREE_TIER_RPD_LIMIT = 1000 # Requests per day
FREE_TIER_RPM_LIMIT = 15 # Requests per minute
FREE_TIER_TPM_LIMIT = 1000000 # Tokens per minute
WARNING_THRESHOLD = 0.9 # Stop at 90% of the limit to be safe
# Usage tracking
usage_file = "usage.txt"
def load_usage():
if not os.path.exists(usage_file):
return {"requests": [], "tokens": []}
with open(usage_file, "r") as f:
data = f.read().strip()
if not data:
return {"requests": [], "tokens": []}
requests, tokens = data.split("|")
return {
"requests": [float(t) for t in requests.split(",") if t],
"tokens": [(float(t), float(n)) for t, n in [pair.split(":") for pair in tokens.split(",") if pair]]
}
def save_usage(requests, tokens):
with open(usage_file, "w") as f:
f.write(",".join(map(str, requests)) + "|" + ",".join(f"{t}:{n}" for t, n in tokens))
def check_usage():
usage = load_usage()
now = time.time()
# Clean up old requests (older than 24 hours)
day_ago = now - 24 * 60 * 60
usage["requests"] = [t for t in usage["requests"] if t > day_ago]
# Clean up old token counts (older than 1 minute)
minute_ago = now - 60
usage["tokens"] = [(t, n) for t, n in usage["tokens"] if t > minute_ago]
# Count requests per day
rpd = len(usage["requests"])
rpd_limit = int(FREE_TIER_RPD_LIMIT * WARNING_THRESHOLD)
if rpd >= rpd_limit:
return False, f"Approaching daily request limit ({rpd}/{FREE_TIER_RPD_LIMIT}). Stopping to stay in free tier. Try again tomorrow."
# Count requests per minute
minute_ago = now - 60
rpm = len([t for t in usage["requests"] if t > minute_ago])
rpm_limit = int(FREE_TIER_RPM_LIMIT * WARNING_THRESHOLD)
if rpm >= rpm_limit:
return False, f"Approaching minute request limit ({rpm}/{FREE_TIER_RPM_LIMIT}). Wait a minute and try again."
# Count tokens per minute
tpm = sum(n for t, n in usage["tokens"])
tpm_limit = int(FREE_TIER_TPM_LIMIT * WARNING_THRESHOLD)
if tpm >= tpm_limit:
return False, f"Approaching token limit ({tpm}/{FREE_TIER_TPM_LIMIT} per minute). Wait a minute and try again."
return True, (rpd, rpm, tpm)
# Initialize Pinecone
pc = pinecone.Pinecone(api_key=PINECONE_API_KEY)
index = pc.Index(PINECONE_INDEX_NAME)
# Initialize BioBERT for embedding queries
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME)
if torch.cuda.is_available():
model.cuda()
# Initialize Gemini and check available models
genai.configure(api_key=GEMINI_API_KEY)
# List available models to confirm free tier access
available_models = [model.name for model in genai.list_models()]
print("Available Gemini models:", available_models)
preferred_model = "gemini-pro" # Use the generally available model
if preferred_model in available_models:
gemini_model = genai.GenerativeModel(preferred_model)
print(f"Using model: {preferred_model}")
else:
# Try other available models (if needed)
for model_name in ["gemini-2.0-flash", "gemini-1.5-pro"]:
if f"models/{model_name}" in available_models:
gemini_model = genai.GenerativeModel(f"models/{model_name}")
print(f"Using model: models/{model_name}")
break # Use the first available match
else:
raise ValueError("No suitable Gemini model available. Available models: " + str(available_models))
# Clean text
def clean_text(text):
text = re.sub(r'<[^>]+>', '', text) # Remove HTML tags
text = re.sub(r'[^\x00-\x7F]+', ' ', text) # Remove non-ASCII
text = re.sub(r'\s+', ' ', text) # Normalize spaces
return text.strip()
# Embed text using BioBERT
def embed_text(text):
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
if torch.cuda.is_available():
inputs = {k: v.cuda() for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0]
return embedding.tolist()
# Extract text from PDF (up to 10 pages)
def extract_pdf_text(pdf_file):
reader = PyPDF2.PdfReader(pdf_file)
num_pages = min(len(reader.pages), 10) # Limit to 10 pages
text = ""
for page in range(num_pages):
text += reader.pages[page].extract_text() + "\n"
return clean_text(text)
# Retrieve relevant chunks from Pinecone
def retrieve_from_pinecone(query, top_k=5):
query_embedding = embed_text(query)
results = index.query(
namespace=PINECONE_NAMESPACE,
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
retrieved_chunks = [match["metadata"]["chunk"] for match in results["matches"]]
return "\n".join(retrieved_chunks)
# Count tokens using Gemini API
def count_tokens(text):
try:
response = gemini_model.count_tokens(text)
return response.total_tokens
except exceptions.QuotaExceeded as e:
return 0 # If quota is exceeded, return 0 to avoid counting issues
# Generate answer using Gemini
def generate_answer(query, context):
prompt = f"""
You are a diabetes research assistant. Answer the following question based on the provided context. If the context is insufficient, use your knowledge to provide a helpful answer, but note if the information might be limited.
**Question**: {query}
**Context**:
{context}
**Answer**:
"""
try:
response = gemini_model.generate_content(prompt)
return response.text
except exceptions.QuotaExceeded as e:
return f"Error: Gemini API quota exceeded ({str(e)}). Try again later."
except Exception as e:
return f"Error generating answer: {str(e)}"
# Main function to handle user input
def diabetes_bot(query, pdf_file=None):
# Check usage limits
can_proceed, usage_info = check_usage()
if not can_proceed:
return usage_info
# Step 1: Get context from PDF if uploaded
pdf_context = ""
if pdf_file is not None:
pdf_context = extract_pdf_text(pdf_file)
if pdf_context:
pdf_context = f"Uploaded PDF content:\n{pdf_context}\n\n"
# Step 2: Retrieve relevant chunks from Pinecone
pinecone_context = retrieve_from_pinecone(query)
if pinecone_context:
pinecone_context = f"Pinecone retrieved content (latest research, 2010 onward):\n{pinecone_context}\n\n"
# Step 3: Combine contexts
full_context = pdf_context + pinecone_context
if not full_context.strip():
full_context = "No relevant context found in Pinecone or uploaded PDF."
# Step 4: Count tokens for the prompt
prompt = f"""
You are a diabetes research assistant. Answer the following question based on the provided context. If the context is insufficient, use your knowledge to provide a helpful answer, but note if the information might be limited.
**Question**: {query}
**Context**:
{full_context}
**Answer**:
"""
input_tokens = count_tokens(prompt)
if input_tokens == 0: # Quota exceeded during token counting
return "Error: Gemini API quota exceeded while counting tokens. Try again later."
# Update usage
usage = load_usage()
now = time.time()
usage["requests"].append(now)
usage["tokens"].append((now, input_tokens))
save_usage(usage["requests"], usage["tokens"])
# Step 5: Generate answer using Gemini
answer = generate_answer(query, full_context)
# Step 6: Count output tokens and update usage
output_tokens = count_tokens(answer)
if output_tokens == 0: # Quota exceeded during output token counting
return answer + "\n\nError: Gemini API quota exceeded while counting output tokens. Usage stats may be incomplete."
usage = load_usage()
usage["tokens"].append((now, output_tokens))
save_usage(usage["requests"], usage["tokens"])
# Step 7: Show usage stats
rpd, rpm, tpm = check_usage()[1]
usage_message = f"\n\nUsage: {rpd}/{FREE_TIER_RPD_LIMIT} requests today, {rpm}/{FREE_TIER_RPM_LIMIT} requests this minute, {tpm}/{FREE_TIER_TPM_LIMIT} tokens this minute."
return answer + usage_message
# Gradio interface
def chat_wrapper(query, pdf, history):
# Initialize history if empty
if history is None:
history = []
# If no query is provided, return the current history without changes
if query.strip() == "":
return history, "", None, history
# Call your existing diabetes_bot function to generate an answer
answer = diabetes_bot(query, pdf)
# Append the new interaction as a message-style tuple (role, content)
history.append({"role": "user", "content": query})
history.append({"role": "assistant", "content": answer})
# Return the updated conversation and clear the query and pdf inputs
return history, "", None, history
def clear_all():
# Clear conversation history and inputs
return [], "", None, []
with gr.Blocks() as app:
gr.HTML(
"""
<h1 style="text-align:center;">Diabetes Research ChatBot Powered By Gemini 2.0 Flash And Pinecone 🩺</h1>
<p style="text-align:center;"><strong>Powered by the latest diabetes research, Running on Gemini 2.0 Flash API</strong></p>
<p style="text-align:center;">Ask questions about diabetes directly or upload a research paper (up to 10 pages) for specific Q&A.</p>
<br>
<div style="border: 1px solid #ccc; border-radius: 5px; padding: 10px; background-color: #f9f9f9; margin:auto; width:80%;">
<strong>Disclaimer:</strong>
The information provided by this chatbot is for research and informational purposes only and is not intended to substitute professional medical advice, diagnosis, or treatment. Always seek the advice of your physician or other qualified health provider with any questions you may have regarding a medical condition.
</div>
<br>
"""
)
# Create a Chatbot component with type set to "messages" and a specified height
chatbot = gr.Chatbot(label="Conversation", type="messages", height=370)
# Input row for query and PDF file (with PDF box sized smaller)
with gr.Row():
query_input = gr.Textbox(label="Ask a Question", placeholder="Type your diabetes-related query here...", lines=2)
with gr.Column(scale=0.2):
pdf_input = gr.File(label="Upload a PDF (optional, max 10 pages)", file_types=[".pdf"])
# Row for Submit and Clear buttons
with gr.Row():
submit_button = gr.Button("Ask", variant="primary")
clear_button = gr.Button("Clear")
# State to maintain conversation history
state = gr.State([])
# On submit, update the conversation and clear inputs; outputs: chatbot, query_input, pdf_input, state
submit_button.click(
fn=chat_wrapper,
inputs=[query_input, pdf_input, state],
outputs=[chatbot, query_input, pdf_input, state]
)
# Clear all components including conversation history
clear_button.click(
fn=clear_all,
inputs=[],
outputs=[chatbot, query_input, pdf_input, state]
)
app.launch()
|