Tokentesting / app.py
frankai98's picture
Update app.py
2553077 verified
raw
history blame
8.58 kB
import os
import nest_asyncio
nest_asyncio.apply()
import streamlit as st
from transformers import pipeline, AutoTokenizer
from huggingface_hub import login
from streamlit.components.v1 import html
import pandas as pd
import torch
import random
# Retrieve the token from environment variables
hf_token = os.environ.get("HF_TOKEN")
if not hf_token:
st.error("Hugging Face token not found. Please set the HF_TOKEN environment variable.")
st.stop()
# Login with the token
login(token=hf_token)
# Initialize session state for timer
#if 'timer_started' not in st.session_state:
#st.session_state.timer_started = False
#if 'timer_frozen' not in st.session_state:
#st.session_state.timer_frozen = False
# Timer component using HTML and JavaScript
def timer():
return """
<div id="timer" style="font-size:16px;color:#666;margin-bottom:10px;">⏱️ Elapsed: 00:00</div>
<script>
(function() {
var start = Date.now();
var timerElement = document.getElementById('timer');
localStorage.removeItem("freezeTimer");
var interval = setInterval(function() {
if(localStorage.getItem("freezeTimer") === "true"){
clearInterval(interval);
timerElement.style.color = '#00cc00';
return;
}
var elapsed = Date.now() - start;
var minutes = Math.floor(elapsed / 60000);
var seconds = Math.floor((elapsed % 60000) / 1000);
timerElement.innerHTML = '⏱️ Elapsed: ' +
(minutes < 10 ? '0' : '') + minutes + ':' +
(seconds < 10 ? '0' : '') + seconds;
}, 1000);
})();
</script>
"""
st.set_page_config(page_title="Review Scorer & Report Generator", page_icon="πŸ“")
st.header("Review Scorer & Report Generator")
# Concise introduction
st.write("This model will score your reviews in your CSV file and generate a report based on your query and those results.")
# Load models with caching to avoid reloading on every run
@st.cache_resource
def load_models():
score_pipe = None
gemma_pipe = None
try:
st.info("Loading sentiment analysis model...")
score_pipe = pipeline("text-classification",
model="cardiffnlp/twitter-roberta-base-sentiment-latest",
device=0 if torch.cuda.is_available() else -1)
st.success("Sentiment analysis model loaded successfully!")
except Exception as e:
st.error(f"Error loading score model: {e}")
try:
st.info("Loading Gemma model...")
# Load the tokenizer separately with the chat template
tokenizer = AutoTokenizer.from_pretrained("google/gemma-3-1b-it")
gemma_pipe = pipeline("text-generation",
model="google/gemma-3-1b-it",
tokenizer=tokenizer, # Pass the loaded tokenizer here
device=0,
torch_dtype=torch.bfloat16)
st.success("Gemma model loaded successfully!")
except Exception as e:
st.error(f"Error loading Gemma model: {e}")
st.error(f"Detailed error: {type(e).__name__}: {str(e)}")
return score_pipe, gemma_pipe
def extract_assistant_content(raw_response):
"""Extract only the assistant's content from the Gemma-3 response."""
# Convert to string and work with it directly
response_str = str(raw_response)
# Look for the assistant's content marker
assistant_marker = "'role': 'assistant', 'content': '"
if assistant_marker in response_str:
start_idx = response_str.find(assistant_marker) + len(assistant_marker)
# Extract everything after the marker until the end or closing quote
content = response_str[start_idx:]
# Find the end of the content (last single quote before the end of the string or before closing curly brace)
end_markers = ["'}", "'}]"]
end_idx = len(content)
for marker in end_markers:
pos = content.rfind(marker)
if pos != -1 and pos < end_idx:
end_idx = pos
return content[:end_idx]
# Fallback - return the original response
return response_str
score_pipe, gemma_pipe = load_models()
# Input: Query text for scoring and CSV file upload for candidate reviews
query_input = st.text_area("Enter your query text for analysis (this does not need to be part of the CSV):")
uploaded_file = st.file_uploader("Upload Reviews CSV File (must contain a 'reviewText' column)", type=["csv"])
if score_pipe is None or gemma_pipe is None:
st.error("Model loading failed. Please check your model names, token permissions, and GPU configuration.")
else:
candidate_docs = []
if uploaded_file is not None:
try:
df = pd.read_csv(uploaded_file)
if 'reviewText' not in df.columns:
st.error("CSV must contain a 'reviewText' column.")
else:
candidate_docs = df['reviewText'].dropna().astype(str).tolist()
except Exception as e:
st.error(f"Error reading CSV file: {e}")
if st.button("Generate Report"):
# Reset timer state so that the timer always shows up
st.session_state.timer_started = False
st.session_state.timer_frozen = False
if uploaded_file is None:
st.error("Please upload a CSV file.")
elif not candidate_docs:
st.error("CSV must contain a 'reviewText' column.")
elif not query_input.strip():
st.error("Please enter a query text!")
else:
if not st.session_state.timer_started and not st.session_state.timer_frozen:
st.session_state.timer_started = True
html(timer(), height=50)
status_text = st.empty()
progress_bar = st.progress(0)
# Stage 1: Score candidate documents using the provided query.
status_text.markdown("**πŸ” Scoring candidate documents...**")
# Process each review individually to avoid memory issues
scored_results = []
for i, doc in enumerate(candidate_docs):
# Update progress based on current document
progress = int((i / len(candidate_docs)) * 50) # First half of progress bar (0-50%)
progress_bar.progress(progress)
# Process single document
result = score_pipe(doc)
scored_results.append(result[0])
# Display occasional status updates for large datasets
if i % max(1, len(candidate_docs) // 10) == 0:
status_text.markdown(f"**πŸ” Scoring documents... ({i}/{len(candidate_docs)})**")
# Pair each review with its score assuming the output order matches the input order.
scored_docs = list(zip(candidate_docs, [result["score"] for result in scored_results]))
progress_bar.progress(67)
# Stage 2: Generate Report using Gemma in the new messages format.
status_text.markdown("**πŸ“ Generating report with Gemma...**")
# For very large datasets, summarize or sample the scored_docs before sending to Gemma
sampled_docs = scored_docs
if len(scored_docs) > 10000: # Arbitrary threshold for what's "too large"
# Option 1: Random sampling
sampled_docs = random.sample(scored_docs, 1000)
# Build the user content with query, sentiment results, and original review data.
# Format the prompt as chat for Gemma
messages = [
{"role": "user", "content": f"""
Generate a concise 300-word report based on the following analysis without repeating what's in the analysis.
Query:
"{query_input}"
Candidate Reviews with their scores:
{scored_docs}
"""}
]
raw_result = gemma_pipe(messages, max_new_tokens=50)
report = extract_assistant_content(raw_result)
progress_bar.progress(100)
status_text.success("**βœ… Generation complete!**")
html("<script>localStorage.setItem('freezeTimer', 'true');</script>", height=0)
st.session_state.timer_frozen = True
#st.write("**Scored Candidate Reviews:**", scored_docs)
st.write("**Generated Report:**", report)