|
import gradio as gr |
|
import pinecone |
|
import requests |
|
import PyPDF2 |
|
from transformers import AutoTokenizer, AutoModel |
|
import torch |
|
import re |
|
import google.generativeai as genai |
|
import os |
|
import time |
|
from datetime import datetime, timedelta |
|
from google.api_core import exceptions |
|
|
|
|
|
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") |
|
PINECONE_INDEX_NAME = "diabetes-bot" |
|
PINECONE_NAMESPACE = "general" |
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
MODEL_NAME = "dmis-lab/biobert-base-cased-v1.1" |
|
|
|
|
|
FREE_TIER_RPD_LIMIT = 1000 |
|
FREE_TIER_RPM_LIMIT = 15 |
|
FREE_TIER_TPM_LIMIT = 1000000 |
|
WARNING_THRESHOLD = 0.9 |
|
|
|
|
|
usage_file = "usage.txt" |
|
|
|
def load_usage(): |
|
if not os.path.exists(usage_file): |
|
return {"requests": [], "tokens": []} |
|
with open(usage_file, "r") as f: |
|
data = f.read().strip() |
|
if not data: |
|
return {"requests": [], "tokens": []} |
|
requests, tokens = data.split("|") |
|
return { |
|
"requests": [float(t) for t in requests.split(",") if t], |
|
"tokens": [(float(t), float(n)) for t, n in [pair.split(":") for pair in tokens.split(",") if pair]] |
|
} |
|
|
|
def save_usage(requests, tokens): |
|
with open(usage_file, "w") as f: |
|
f.write(",".join(map(str, requests)) + "|" + ",".join(f"{t}:{n}" for t, n in tokens)) |
|
|
|
def check_usage(): |
|
usage = load_usage() |
|
now = time.time() |
|
|
|
|
|
day_ago = now - 24 * 60 * 60 |
|
usage["requests"] = [t for t in usage["requests"] if t > day_ago] |
|
|
|
|
|
minute_ago = now - 60 |
|
usage["tokens"] = [(t, n) for t, n in usage["tokens"] if t > minute_ago] |
|
|
|
|
|
rpd = len(usage["requests"]) |
|
rpd_limit = int(FREE_TIER_RPD_LIMIT * WARNING_THRESHOLD) |
|
if rpd >= rpd_limit: |
|
return False, f"Approaching daily request limit ({rpd}/{FREE_TIER_RPD_LIMIT}). Stopping to stay in free tier. Try again tomorrow." |
|
|
|
|
|
minute_ago = now - 60 |
|
rpm = len([t for t in usage["requests"] if t > minute_ago]) |
|
rpm_limit = int(FREE_TIER_RPM_LIMIT * WARNING_THRESHOLD) |
|
if rpm >= rpm_limit: |
|
return False, f"Approaching minute request limit ({rpm}/{FREE_TIER_RPM_LIMIT}). Wait a minute and try again." |
|
|
|
|
|
tpm = sum(n for t, n in usage["tokens"]) |
|
tpm_limit = int(FREE_TIER_TPM_LIMIT * WARNING_THRESHOLD) |
|
if tpm >= tpm_limit: |
|
return False, f"Approaching token limit ({tpm}/{FREE_TIER_TPM_LIMIT} per minute). Wait a minute and try again." |
|
|
|
return True, (rpd, rpm, tpm) |
|
|
|
|
|
pc = pinecone.Pinecone(api_key=PINECONE_API_KEY) |
|
index = pc.Index(PINECONE_INDEX_NAME) |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) |
|
model = AutoModel.from_pretrained(MODEL_NAME) |
|
if torch.cuda.is_available(): |
|
model.cuda() |
|
|
|
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
|
|
available_models = [model.name for model in genai.list_models()] |
|
print("Available Gemini models:", available_models) |
|
|
|
preferred_model = "gemini-pro" |
|
if preferred_model in available_models: |
|
gemini_model = genai.GenerativeModel(preferred_model) |
|
print(f"Using model: {preferred_model}") |
|
else: |
|
|
|
for model_name in ["gemini-2.0-flash", "gemini-1.5-pro"]: |
|
if f"models/{model_name}" in available_models: |
|
gemini_model = genai.GenerativeModel(f"models/{model_name}") |
|
print(f"Using model: models/{model_name}") |
|
break |
|
else: |
|
raise ValueError("No suitable Gemini model available. Available models: " + str(available_models)) |
|
|
|
|
|
def clean_text(text): |
|
text = re.sub(r'<[^>]+>', '', text) |
|
text = re.sub(r'[^\x00-\x7F]+', ' ', text) |
|
text = re.sub(r'\s+', ' ', text) |
|
return text.strip() |
|
|
|
|
|
def embed_text(text): |
|
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512) |
|
if torch.cuda.is_available(): |
|
inputs = {k: v.cuda() for k, v in inputs.items()} |
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0] |
|
return embedding.tolist() |
|
|
|
|
|
def extract_pdf_text(pdf_file): |
|
reader = PyPDF2.PdfReader(pdf_file) |
|
num_pages = min(len(reader.pages), 10) |
|
text = "" |
|
for page in range(num_pages): |
|
text += reader.pages[page].extract_text() + "\n" |
|
return clean_text(text) |
|
|
|
|
|
def retrieve_from_pinecone(query, top_k=5): |
|
query_embedding = embed_text(query) |
|
results = index.query( |
|
namespace=PINECONE_NAMESPACE, |
|
vector=query_embedding, |
|
top_k=top_k, |
|
include_metadata=True |
|
) |
|
retrieved_chunks = [match["metadata"]["chunk"] for match in results["matches"]] |
|
return "\n".join(retrieved_chunks) |
|
|
|
|
|
def count_tokens(text): |
|
try: |
|
response = gemini_model.count_tokens(text) |
|
return response.total_tokens |
|
except exceptions.QuotaExceeded as e: |
|
return 0 |
|
|
|
|
|
def generate_answer(query, context): |
|
prompt = f""" |
|
You are a diabetes research assistant. Answer the following question based on the provided context. If the context is insufficient, use your knowledge to provide a helpful answer, but note if the information might be limited. |
|
|
|
**Question**: {query} |
|
|
|
**Context**: |
|
{context} |
|
|
|
**Answer**: |
|
""" |
|
try: |
|
response = gemini_model.generate_content(prompt) |
|
return response.text |
|
except exceptions.QuotaExceeded as e: |
|
return f"Error: Gemini API quota exceeded ({str(e)}). Try again later." |
|
except Exception as e: |
|
return f"Error generating answer: {str(e)}" |
|
|
|
|
|
def diabetes_bot(query, pdf_file=None): |
|
|
|
can_proceed, usage_info = check_usage() |
|
if not can_proceed: |
|
return usage_info |
|
|
|
|
|
pdf_context = "" |
|
if pdf_file is not None: |
|
pdf_context = extract_pdf_text(pdf_file) |
|
if pdf_context: |
|
pdf_context = f"Uploaded PDF content:\n{pdf_context}\n\n" |
|
|
|
|
|
pinecone_context = retrieve_from_pinecone(query) |
|
if pinecone_context: |
|
pinecone_context = f"Pinecone retrieved content (latest research, 2010 onward):\n{pinecone_context}\n\n" |
|
|
|
|
|
full_context = pdf_context + pinecone_context |
|
if not full_context.strip(): |
|
full_context = "No relevant context found in Pinecone or uploaded PDF." |
|
|
|
|
|
prompt = f""" |
|
You are a diabetes research assistant. Answer the following question based on the provided context. If the context is insufficient, use your knowledge to provide a helpful answer, but note if the information might be limited. |
|
|
|
**Question**: {query} |
|
|
|
**Context**: |
|
{full_context} |
|
|
|
**Answer**: |
|
""" |
|
input_tokens = count_tokens(prompt) |
|
if input_tokens == 0: |
|
return "Error: Gemini API quota exceeded while counting tokens. Try again later." |
|
|
|
|
|
usage = load_usage() |
|
now = time.time() |
|
usage["requests"].append(now) |
|
usage["tokens"].append((now, input_tokens)) |
|
save_usage(usage["requests"], usage["tokens"]) |
|
|
|
|
|
answer = generate_answer(query, full_context) |
|
|
|
|
|
output_tokens = count_tokens(answer) |
|
if output_tokens == 0: |
|
return answer + "\n\nError: Gemini API quota exceeded while counting output tokens. Usage stats may be incomplete." |
|
usage = load_usage() |
|
usage["tokens"].append((now, output_tokens)) |
|
save_usage(usage["requests"], usage["tokens"]) |
|
|
|
|
|
rpd, rpm, tpm = check_usage()[1] |
|
usage_message = f"\n\nUsage: {rpd}/{FREE_TIER_RPD_LIMIT} requests today, {rpm}/{FREE_TIER_RPM_LIMIT} requests this minute, {tpm}/{FREE_TIER_TPM_LIMIT} tokens this minute." |
|
|
|
return answer + usage_message |
|
|
|
|
|
def chat_wrapper(query, pdf, history): |
|
|
|
if history is None: |
|
history = [] |
|
|
|
if query.strip() == "": |
|
return history, "", None, history |
|
|
|
answer = diabetes_bot(query, pdf) |
|
|
|
history.append({"role": "user", "content": query}) |
|
history.append({"role": "assistant", "content": answer}) |
|
|
|
return history, "", None, history |
|
|
|
def clear_all(): |
|
|
|
return [], "", None, [] |
|
|
|
with gr.Blocks() as app: |
|
gr.HTML( |
|
""" |
|
<h1 style="text-align:center;">Diabetes Research ChatBot Powered By Gemini 2.0 Flash And Pinecone 🩺</h1> |
|
<p style="text-align:center;"><strong>Powered by the latest diabetes research, Running on Gemini 2.0 Flash API</strong></p> |
|
<p style="text-align:center;">Ask questions about diabetes directly or upload a research paper (up to 10 pages) for specific Q&A.</p> |
|
<br> |
|
<div style="border: 1px solid #ccc; border-radius: 5px; padding: 10px; background-color: #f9f9f9; margin:auto; width:80%;"> |
|
<strong>Disclaimer:</strong> |
|
The information provided by this chatbot is for research and informational purposes only and is not intended to substitute professional medical advice, diagnosis, or treatment. Always seek the advice of your physician or other qualified health provider with any questions you may have regarding a medical condition. |
|
</div> |
|
<br> |
|
""" |
|
) |
|
|
|
|
|
chatbot = gr.Chatbot(label="Conversation", type="messages", height=370) |
|
|
|
|
|
with gr.Row(): |
|
query_input = gr.Textbox(label="Ask a Question", placeholder="Type your diabetes-related query here...", lines=2) |
|
with gr.Column(scale=0.2): |
|
pdf_input = gr.File(label="Upload a PDF (optional, max 10 pages)", file_types=[".pdf"]) |
|
|
|
|
|
with gr.Row(): |
|
submit_button = gr.Button("Ask", variant="primary") |
|
clear_button = gr.Button("Clear") |
|
|
|
|
|
|
|
state = gr.State([]) |
|
|
|
|
|
submit_button.click( |
|
fn=chat_wrapper, |
|
inputs=[query_input, pdf_input, state], |
|
outputs=[chatbot, query_input, pdf_input, state] |
|
) |
|
|
|
|
|
clear_button.click( |
|
fn=clear_all, |
|
inputs=[], |
|
outputs=[chatbot, query_input, pdf_input, state] |
|
) |
|
|
|
app.launch() |
|
|