|
import gradio as gr |
|
import torch |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_community.vectorstores import FAISS |
|
from langchain.chains import ConversationalRetrievalChain |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain.memory import ConversationBufferMemory |
|
from langchain_community.llms import HuggingFaceEndpoint |
|
import os |
|
import time |
|
|
|
|
|
sentiment_tokenizer = AutoTokenizer.from_pretrained("dnzblgn/Sentiment-Analysis-Customer-Reviews") |
|
sentiment_model = AutoModelForSequenceClassification.from_pretrained("dnzblgn/Sentiment-Analysis-Customer-Reviews") |
|
|
|
sarcasm_tokenizer = AutoTokenizer.from_pretrained("dnzblgn/Sarcasm-Detection-Customer-Reviews") |
|
sarcasm_model = AutoModelForSequenceClassification.from_pretrained("dnzblgn/Sarcasm-Detection-Customer-Reviews") |
|
|
|
doc_tokenizer = AutoTokenizer.from_pretrained("dnzblgn/Customer-Reviews-Classification") |
|
doc_model = AutoModelForSequenceClassification.from_pretrained("dnzblgn/Customer-Reviews-Classification") |
|
|
|
label_mapping = { |
|
"shipping_and_delivery": 0, |
|
"customer_service": 1, |
|
"price_and_value": 2, |
|
"quality_and_performance": 3, |
|
"use_and_design": 4, |
|
"other": 5 |
|
} |
|
reverse_label_mapping = {v: k for k, v in label_mapping.items()} |
|
|
|
def analyze_reviews(reviews): |
|
analysis = { |
|
"overall": {"positive": 0, "negative": 0}, |
|
"categories": {label: {"positive": 0, "negative": 0} for label in label_mapping.keys()} |
|
} |
|
|
|
for review in reviews: |
|
sentiment_inputs = sentiment_tokenizer(review, return_tensors="pt", truncation=True, padding=True, max_length=512) |
|
with torch.no_grad(): |
|
sentiment_outputs = sentiment_model(**sentiment_inputs) |
|
sentiment_class = torch.argmax(sentiment_outputs.logits, dim=-1).item() |
|
sentiment = "positive" if sentiment_class == 0 else "negative" |
|
|
|
if sentiment == "positive": |
|
sarcasm_inputs = sarcasm_tokenizer(review, return_tensors="pt", truncation=True, padding=True, max_length=512) |
|
with torch.no_grad(): |
|
sarcasm_outputs = sarcasm_model(**sarcasm_inputs) |
|
sarcasm_class = torch.argmax(sarcasm_outputs.logits, dim=-1).item() |
|
if sarcasm_class == 1: |
|
sentiment = "negative" |
|
|
|
doc_inputs = doc_tokenizer(review, return_tensors="pt", truncation=True, padding=True, max_length=512) |
|
with torch.no_grad(): |
|
doc_outputs = doc_model(**doc_inputs) |
|
category_class = torch.argmax(doc_outputs.logits, dim=-1).item() |
|
category = reverse_label_mapping[category_class] |
|
|
|
analysis["overall"][sentiment] += 1 |
|
analysis["categories"][category][sentiment] += 1 |
|
|
|
return analysis |
|
|
|
def generate_analysis_document(analysis): |
|
total_reviews = analysis["overall"]["positive"] + analysis["overall"]["negative"] |
|
overall_positive = analysis["overall"]["positive"] |
|
overall_negative = analysis["overall"]["negative"] |
|
|
|
doc = [ |
|
f"Overall Sentiment Analysis:", |
|
f"Positive Feedback: {overall_positive} comments ({(overall_positive / total_reviews) * 100:.0f}%)", |
|
f"Negative Feedback: {overall_negative} comments ({(overall_negative / total_reviews) * 100:.0f}%)", |
|
"--END--", |
|
"Category-Specific Analysis:", |
|
"--END--" |
|
] |
|
|
|
for category, feedback in analysis["categories"].items(): |
|
total_category = feedback["positive"] + feedback["negative"] |
|
positive_rate = (feedback["positive"] / total_category) * 100 if total_category > 0 else 0 |
|
negative_rate = (feedback["negative"] / total_category) * 100 if total_category > 0 else 0 |
|
doc.extend([ |
|
f"{category.capitalize()}:", |
|
f"- Positive Feedback: {feedback['positive']} comments ({positive_rate:.0f}%)", |
|
f"- Negative Feedback: {feedback['negative']} comments ({negative_rate:.0f}%)", |
|
"--END--" |
|
]) |
|
|
|
return "\n".join(doc) |
|
|
|
def write_analysis_to_file(analysis_document): |
|
with open("processed_analysis.txt", "w") as f: |
|
f.write(analysis_document) |
|
return "processed_analysis.txt" |
|
|
|
def read_processed_file(): |
|
with open("processed_analysis.txt", "r") as f: |
|
return f.read() |
|
|
|
def create_db_from_analysis(analysis_document): |
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=1024, chunk_overlap=64 |
|
) |
|
splits = text_splitter.create_documents([analysis_document]) |
|
embeddings = HuggingFaceEmbeddings() |
|
vector_db = FAISS.from_documents(splits, embeddings) |
|
return vector_db |
|
|
|
def initialize_chatbot(vector_db): |
|
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) |
|
retriever = vector_db.as_retriever() |
|
|
|
llm = HuggingFaceEndpoint( |
|
repo_id="mistralai/Mistral-7B-Instruct-v0.2", |
|
huggingfacehub_api_token = os.environ.get("HUGGINGFACE_API_TOKEN"), |
|
temperature=0.5, |
|
max_new_tokens=256 |
|
) |
|
|
|
qa_chain = ConversationalRetrievalChain.from_llm( |
|
llm=llm, |
|
retriever=retriever, |
|
memory=memory, |
|
verbose=False |
|
) |
|
return qa_chain |
|
|
|
def process_and_initialize(file): |
|
if file is None: |
|
return None, None, "Please upload a file first." |
|
|
|
try: |
|
if not os.path.exists(file): |
|
return None, None, "File not found. Please try uploading again." |
|
|
|
with open(file, 'r', encoding='utf-8') as f: |
|
reviews = [line.strip() for line in f if line.strip()] |
|
|
|
if not reviews: |
|
return None, None, "File is empty. Please upload a file with reviews." |
|
|
|
analysis = analyze_reviews(reviews) |
|
analysis_doc = generate_analysis_document(analysis) |
|
|
|
processed_file = write_analysis_to_file(analysis_doc) |
|
processed_content = read_processed_file() |
|
|
|
db = create_db_from_analysis(processed_content) |
|
qa = initialize_chatbot(db) |
|
|
|
return db, qa, f"Successfully processed {len(reviews)} reviews! Ready for questions." |
|
|
|
except Exception as e: |
|
return None, None, f"Processing error: {str(e)}" |
|
|
|
def user_query_typing_effect(query, qa_chain, chatbot): |
|
history = chatbot or [] |
|
try: |
|
response = qa_chain.invoke({"question": query, "chat_history": []}) |
|
assistant_response = response["answer"] |
|
|
|
history.append({"role": "user", "content": query}) |
|
history.append({"role": "assistant", "content": ""}) |
|
|
|
for i in range(len(assistant_response)): |
|
history[-1]["content"] += assistant_response[i] |
|
yield history, "" |
|
time.sleep(0.05) |
|
except Exception as e: |
|
history.append({"role": "assistant", "content": f"Error: {str(e)}"}) |
|
yield history, "" |
|
|
|
def demo(): |
|
custom_css = """ |
|
body { |
|
background-color: #FF8C00; |
|
font-family: Arial, sans-serif; |
|
} |
|
.gradio-container { |
|
border-radius: 15px; |
|
box-shadow: 0px 4px 20px rgba(0, 0, 0, 0.3); |
|
padding: 20px; |
|
} |
|
footer { |
|
visibility: hidden; |
|
} |
|
.chatbot { |
|
border: 2px solid #000; |
|
border-radius: 10px; |
|
background-color: #FFF5E1; |
|
} |
|
""" |
|
|
|
with gr.Blocks(css=custom_css) as app: |
|
vector_db = gr.State(None) |
|
qa_chain = gr.State(None) |
|
|
|
gr.Markdown("### π **Customer Review Analysis and Chatbot** π") |
|
gr.Markdown("#### Upload your review file and ask questions interactively!") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
txt_file = gr.File( |
|
label="π Upload Reviews", |
|
file_types=[".txt"], |
|
type="filepath" |
|
) |
|
analyze_btn = gr.Button("π Process Reviews") |
|
status = gr.Textbox( |
|
label="π Status", |
|
placeholder="Status updates will appear here...", |
|
interactive=False |
|
) |
|
|
|
with gr.Column(scale=3): |
|
chatbot = gr.Chatbot( |
|
label="π€ Chat with your data", |
|
height=600, |
|
bubble_full_width=False, |
|
show_label=False, |
|
render_markdown=True, |
|
type="messages", |
|
elem_classes=["chatbot"] |
|
) |
|
query_input = gr.Textbox( |
|
label="Ask a question", |
|
placeholder="Ask about the reviews...", |
|
show_label=False, |
|
container=False |
|
) |
|
query_btn = gr.Button("Ask") |
|
|
|
analyze_btn.click( |
|
fn=process_and_initialize, |
|
inputs=[txt_file], |
|
outputs=[vector_db, qa_chain, status], |
|
show_progress="minimal" |
|
) |
|
|
|
query_btn.click( |
|
fn=user_query_typing_effect, |
|
inputs=[query_input, qa_chain, chatbot], |
|
outputs=[chatbot, query_input], |
|
show_progress="minimal" |
|
) |
|
|
|
query_input.submit( |
|
fn=user_query_typing_effect, |
|
inputs=[query_input, qa_chain, chatbot], |
|
outputs=[chatbot, query_input], |
|
show_progress="minimal" |
|
) |
|
|
|
app.launch() |
|
|
|
if __name__ == "__main__": |
|
demo() |
|
|
|
|