Ultronprime's picture
Update app.py
79558d9 verified
import gradio as gr
import os
import json
import pandas as pd
import numpy as np
import datetime
import plotly.express as px
import plotly.graph_objects as go
import msal
import requests
import tqdm
import tempfile
import time
from typing import List, Dict, Any, Tuple, Optional
# Configuration
MS_CLIENT_ID = os.getenv("MS_CLIENT_ID", "ff0d5b77-56a9-4fa0-bd59-5c7b4889186e")
MS_TENANT_ID = os.getenv("MS_TENANT_ID", "677c00b7-cf19-4fef-9962-132a076ae325")
MS_AUTHORITY = f"https://login.microsoftonline.com/{MS_TENANT_ID}"
MS_REDIRECT_URI = os.getenv("MS_REDIRECT_URI", "https://huggingface.co/spaces/YOUR-USERNAME/email-thread-analyzer/")
# Microsoft Graph API scopes
SCOPES = [
"User.Read",
"Mail.Read",
"Mail.ReadBasic",
]
# Global variables
auth_app = None
current_user = None
user_token = None
emails = []
email_threads = {}
search_results = []
qa_data = {}
# Initialize MSAL app
def init_auth_app():
global auth_app
auth_app = msal.PublicClientApplication(
client_id=MS_CLIENT_ID,
authority=MS_AUTHORITY
)
# Get authorization URL
def get_auth_url():
auth_url = auth_app.get_authorization_request_url(
scopes=SCOPES,
redirect_uri=MS_REDIRECT_URI,
state="state"
)
return auth_url
# Process auth code
def process_auth_code(auth_code):
global current_user, user_token
try:
# Acquire token
token_response = auth_app.acquire_token_by_authorization_code(
code=auth_code,
scopes=SCOPES,
redirect_uri=MS_REDIRECT_URI
)
if "error" in token_response:
return f"Error: {token_response['error_description']}"
# Store token
user_token = token_response
# Get user info
user_response = requests.get(
"https://graph.microsoft.com/v1.0/me",
headers={"Authorization": f"Bearer {user_token['access_token']}"}
)
if user_response.status_code == 200:
current_user = user_response.json()
return f"Successfully authenticated as {current_user['displayName']}"
else:
return f"Error getting user info: {user_response.text}"
except Exception as e:
return f"Error during authentication: {str(e)}"
# Get mail folders
def get_mail_folders():
if not user_token:
return [], "Not authenticated"
try:
response = requests.get(
"https://graph.microsoft.com/v1.0/me/mailFolders",
headers={"Authorization": f"Bearer {user_token['access_token']}"}
)
if response.status_code == 200:
folders = response.json()["value"]
return [(folder["displayName"], folder["id"]) for folder in folders], None
else:
return [], f"Error: {response.text}"
except Exception as e:
return [], f"Error: {str(e)}"
# Extract emails from folder
def extract_emails(folder_id, max_emails=100, batch_size=25, start_date=None, end_date=None):
global emails, email_threads
if not user_token:
return "Not authenticated"
try:
# Reset data
emails = []
email_threads = {}
# Prepare filter
filter_query = ""
if start_date and end_date:
start_date_iso = datetime.datetime.strptime(start_date, "%Y-%m-%d").isoformat() + "Z"
end_date_iso = datetime.datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "Z"
filter_query = f"receivedDateTime ge {start_date_iso} and receivedDateTime le {end_date_iso}"
# Extract emails in batches
for i in range(0, max_emails, batch_size):
# Prepare request
url = f"https://graph.microsoft.com/v1.0/me/mailFolders/{folder_id}/messages"
headers = {"Authorization": f"Bearer {user_token['access_token']}"}
params = {
"$select": "id,subject,sender,from,toRecipients,ccRecipients,receivedDateTime,conversationId,bodyPreview,uniqueBody",
"$top": batch_size,
"$skip": i
}
if filter_query:
params["$filter"] = filter_query
# Make request
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
return f"Error: {response.text}"
batch_emails = response.json()["value"]
if not batch_emails:
break
emails.extend(batch_emails)
if len(emails) >= max_emails:
emails = emails[:max_emails]
break
# Organize emails into threads
organize_email_threads()
return f"Successfully extracted {len(emails)} emails organized into {len(email_threads)} threads"
except Exception as e:
return f"Error: {str(e)}"
# Organize emails into threads
def organize_email_threads():
global email_threads
threads = {}
for email in emails:
conversation_id = email["conversationId"]
if conversation_id not in threads:
threads[conversation_id] = []
threads[conversation_id].append(email)
# Sort emails within each thread by date
for thread_id, thread_emails in threads.items():
thread_emails.sort(key=lambda x: x["receivedDateTime"])
# Extract thread metadata
threads[thread_id] = {
"emails": thread_emails,
"subject": thread_emails[0]["subject"],
"start_date": thread_emails[0]["receivedDateTime"],
"end_date": thread_emails[-1]["receivedDateTime"],
"message_count": len(thread_emails),
"participants": get_unique_participants(thread_emails)
}
email_threads = threads
# Get unique participants
def get_unique_participants(thread_emails):
participants = set()
for email in thread_emails:
# Add sender
if "sender" in email and "emailAddress" in email["sender"]:
participants.add(email["sender"]["emailAddress"]["address"])
# Add recipients
if "toRecipients" in email:
for recipient in email["toRecipients"]:
participants.add(recipient["emailAddress"]["address"])
# Add CC recipients
if "ccRecipients" in email:
for recipient in email["ccRecipients"]:
participants.add(recipient["emailAddress"]["address"])
return list(participants)
# Search threads using simple keyword matching
def search_threads(query):
global search_results
if not query or not email_threads:
search_results = []
return "Please enter a search query and ensure emails have been extracted"
try:
# Search terms
search_terms = query.lower().split()
# Calculate relevance scores
results = []
for thread_id, thread in email_threads.items():
# Prepare text content from thread
content = f"{thread['subject'].lower()} "
for email in thread["emails"]:
content += f"{email['bodyPreview'].lower()} "
# Calculate score based on term frequency
score = 0
for term in search_terms:
score += content.count(term)
if score > 0:
results.append((thread, score))
# Sort by score
results.sort(key=lambda x: x[1], reverse=True)
search_results = [thread for thread, _ in results]
if not search_results:
return "No relevant threads found"
return f"Found {len(search_results)} relevant threads"
except Exception as e:
search_results = []
return f"Error: {str(e)}"
# Generate Q&A for thread
def generate_qa(thread_id):
if thread_id not in email_threads:
return "Thread not found"
try:
thread = email_threads[thread_id]
# Create thread context
context = f"Thread subject: {thread['subject']}\n\n"
for email in thread["emails"]:
sender = email["sender"]["emailAddress"]["address"]
content += f"From: {sender}\n"
content += f"Date: {email['receivedDateTime']}\n"
content += f"Content: {email['bodyPreview']}\n\n"
# Generate sample questions
questions = [
f"What is the main topic of this email thread about '{thread['subject']}'?",
"Who are the key participants in this conversation?",
"What was the timeline of this discussion?",
"What were the main points discussed in this thread?"
]
# Generate simple answers (simulating AI responses)
answers = [
f"The main topic appears to be '{thread['subject']}', which discusses project-related matters.",
f"The key participants include {', '.join(thread['participants'][:3])}" +
(f" and {len(thread['participants']) - 3} others" if len(thread['participants']) > 3 else ""),
f"The conversation started on {thread['start_date'].split('T')[0]} and the last message was on {thread['end_date'].split('T')[0]}.",
"The main points include updates on project status, discussion of requirements, and next steps."
]
# Create summary
summary = f"This is an email thread with {thread['message_count']} messages about '{thread['subject']}'. "
summary += f"The conversation started on {thread['start_date'].split('T')[0]} and ended on {thread['end_date'].split('T')[0]}. "
summary += f"There are {len(thread['participants'])} participants in this thread."
# Store Q&A data
qa_data[thread_id] = {
"questions": questions,
"answers": answers,
"summary": summary
}
return f"Generated {len(questions)} Q&A pairs for thread"
except Exception as e:
return f"Error generating Q&A: {str(e)}"
# Get thread size distribution
def get_thread_size_distribution():
if not email_threads:
return None
# Count threads by size
sizes = {}
for thread in email_threads.values():
size = thread["message_count"]
if size in sizes:
sizes[size] += 1
else:
sizes[size] = 1
# Convert to dataframe
df = pd.DataFrame([
{"Size": size, "Count": count}
for size, count in sizes.items()
])
# Sort by size
df = df.sort_values("Size")
# Create chart
fig = px.bar(df, x="Size", y="Count", title="Thread Size Distribution")
return fig
# Get activity over time
def get_activity_over_time():
if not emails:
return None
# Count emails by date
dates = {}
for email in emails:
date = email["receivedDateTime"].split("T")[0]
if date in dates:
dates[date] += 1
else:
dates[date] = 1
# Convert to dataframe
df = pd.DataFrame([
{"Date": date, "Count": count}
for date, count in dates.items()
])
# Sort by date
df = df.sort_values("Date")
# Create chart
fig = px.line(df, x="Date", y="Count", title="Activity Over Time")
return fig
# Get participant activity
def get_participant_activity():
if not emails:
return None
# Count emails by sender
senders = {}
for email in emails:
if "sender" in email and "emailAddress" in email["sender"]:
sender = email["sender"]["emailAddress"]["address"]
if sender in senders:
senders[sender] += 1
else:
senders[sender] = 1
# Convert to dataframe
df = pd.DataFrame([
{"Participant": sender, "Count": count}
for sender, count in senders.items()
])
# Sort by count
df = df.sort_values("Count", ascending=False).head(10)
# Create chart
fig = px.bar(df, x="Count", y="Participant", title="Top 10 Participants", orientation='h')
return fig
# Export thread data with Q&A
def export_thread_data(thread_id):
if thread_id not in email_threads:
return None
thread = email_threads[thread_id]
qa = qa_data.get(thread_id, {"questions": [], "answers": [], "summary": ""})
export_data = {
"subject": thread["subject"],
"start_date": thread["start_date"],
"end_date": thread["end_date"],
"message_count": thread["message_count"],
"participants": thread["participants"],
"emails": [
{
"sender": email["sender"]["emailAddress"]["address"],
"received_date_time": email["receivedDateTime"],
"subject": email["subject"],
"body_preview": email["bodyPreview"]
}
for email in thread["emails"]
],
"qa": qa
}
# Save to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.json', mode='w') as f:
json.dump(export_data, f, indent=2)
return f.name
# Initialize
init_auth_app()
# Create the Gradio interface
with gr.Blocks(title="Email Thread Analyzer with AI Q&A") as demo:
gr.Markdown("# Email Thread Analyzer with AI Q&A")
# Authentication section
with gr.Tab("Authentication"):
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("## Sign in with Microsoft")
gr.Markdown("1. Click 'Get Authentication URL' to start the sign-in process")
gr.Markdown("2. Copy the authorization code from the redirect URL")
gr.Markdown("3. Paste the code below and submit")
with gr.Column(scale=3):
auth_url_button = gr.Button("Get Authentication URL")
auth_url_output = gr.Textbox(label="Authentication URL", interactive=False)
auth_code_input = gr.Textbox(label="Authorization Code")
auth_submit = gr.Button("Submit Authorization Code")
auth_status = gr.Textbox(label="Authentication Status", interactive=False)
# Email Extraction section
with gr.Tab("Email Extraction"):
with gr.Row():
with gr.Column():
folder_dropdown = gr.Dropdown(label="Select Mail Folder")
refresh_folders_button = gr.Button("Refresh Folders")
with gr.Row():
max_emails_input = gr.Number(label="Max Emails", value=100, minimum=1, maximum=1000)
batch_size_input = gr.Number(label="Batch Size", value=25, minimum=1, maximum=100)
with gr.Row():
start_date_input = gr.Textbox(label="Start Date (YYYY-MM-DD)")
end_date_input = gr.Textbox(label="End Date (YYYY-MM-DD)")
extract_button = gr.Button("Extract Emails")
extraction_status = gr.Textbox(label="Extraction Status", interactive=False)
# Thread Analysis section
with gr.Tab("Thread Analysis"):
with gr.Row():
with gr.Column():
analysis_status = gr.Textbox(label="Analysis Status")
with gr.Tabs():
with gr.Tab("Thread Size"):
thread_size_plot = gr.Plot(label="Thread Size Distribution")
with gr.Tab("Activity Over Time"):
activity_plot = gr.Plot(label="Activity Over Time")
with gr.Tab("Top Participants"):
participants_plot = gr.Plot(label="Top Participants")
generate_analytics_button = gr.Button("Generate Analytics")
# Search section
with gr.Tab("Search"):
with gr.Row():
with gr.Column():
search_input = gr.Textbox(label="Search Query")
search_button = gr.Button("Search")
search_status = gr.Textbox(label="Search Status", interactive=False)
with gr.Column():
search_results_dropdown = gr.Dropdown(label="Search Results")
view_thread_button = gr.Button("View Thread")
# Q&A section
with gr.Tab("Q&A"):
with gr.Row():
with gr.Column():
thread_info = gr.Textbox(label="Thread Information", interactive=False)
qa_status = gr.Textbox(label="Q&A Status", interactive=False)
with gr.Accordion("Thread Content", open=False):
thread_content = gr.Textbox(label="Thread Content", interactive=False, lines=10)
with gr.Row():
question_dropdown = gr.Dropdown(label="Questions")
gen_qa_button = gr.Button("Generate Q&A")
answer_output = gr.Textbox(label="Answer", interactive=False, lines=5)
summary_output = gr.Textbox(label="Summary", interactive=False, lines=5)
export_thread_button = gr.Button("Export Thread Data")
export_output = gr.File(label="Export Data")
# Set up event handlers
# Authentication events
auth_url_button.click(
fn=get_auth_url,
outputs=auth_url_output
)
auth_submit.click(
fn=process_auth_code,
inputs=auth_code_input,
outputs=auth_status
)
# Folder refresh event
refresh_folders_button.click(
fn=lambda: get_mail_folders()[0],
outputs=folder_dropdown
)
# Email extraction event
extract_button.click(
fn=extract_emails,
inputs=[folder_dropdown, max_emails_input, batch_size_input, start_date_input, end_date_input],
outputs=extraction_status
)
# Analytics generation event
generate_analytics_button.click(
fn=lambda: (
"Analytics generated successfully",
get_thread_size_distribution(),
get_activity_over_time(),
get_participant_activity()
),
outputs=[analysis_status, thread_size_plot, activity_plot, participants_plot]
)
# Search events
search_button.click(
fn=lambda query: (
search_threads(query),
[f"{thread['subject']} ({thread['message_count']} messages)" for thread in search_results]
),
inputs=search_input,
outputs=[search_status, search_results_dropdown]
)
# Thread view event
def view_thread_details(thread_idx):
if not search_results or thread_idx < 0 or thread_idx >= len(search_results):
return "No thread selected", "", [], "", "", None
thread = search_results[thread_idx]
thread_id = thread["emails"][0]["conversationId"]
# Generate thread content
content = f"Subject: {thread['subject']}\n\n"
for email in thread["emails"]:
sender = email["sender"]["emailAddress"]["address"]
date = email["receivedDateTime"]
content += f"From: {sender} | Date: {date}\n"
content += f"Content: {email['bodyPreview']}\n\n"
# Generate Q&A if not already generated
qa_result = "Q&A already generated"
if thread_id not in qa_data:
qa_result = generate_qa(thread_id)
# Get questions, answer, summary
questions = qa_data.get(thread_id, {}).get("questions", [])
answer = qa_data.get(thread_id, {}).get("answers", [""])[0] if questions else ""
summary = qa_data.get(thread_id, {}).get("summary", "")
# Export data
export_data = export_thread_data(thread_id)
return f"Thread: {thread['subject']} ({thread['message_count']} messages)", content, questions, answer, summary, export_data
view_thread_button.click(
fn=lambda: view_thread_details(0 if not search_results_dropdown.value else search_results_dropdown.index),
outputs=[thread_info, thread_content, question_dropdown, answer_output, summary_output, export_output]
)
# Q&A events
question_dropdown.change(
fn=lambda q, thread_idx: qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("answers", [""])[qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []).index(q)] if q and thread_idx >= 0 and thread_idx < len(search_results) and search_results[thread_idx]["emails"][0]["conversationId"] in qa_data and q in qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) else "",
inputs=[question_dropdown, lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index],
outputs=answer_output
)
gen_qa_button.click(
fn=lambda thread_idx: (
generate_qa(search_results[thread_idx]["emails"][0]["conversationId"]) if thread_idx >= 0 and thread_idx < len(search_results) else "No thread selected",
qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) if thread_idx >= 0 and thread_idx < len(search_results) else [],
qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("answers", [""])[0] if thread_idx >= 0 and thread_idx < len(search_results) and search_results[thread_idx]["emails"][0]["conversationId"] in qa_data and qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) else "",
qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("summary", "") if thread_idx >= 0 and thread_idx < len(search_results) else ""
),
inputs=lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index,
outputs=[qa_status, question_dropdown, answer_output, summary_output]
)
# Export event
export_thread_button.click(
fn=lambda thread_idx: export_thread_data(search_results[thread_idx]["emails"][0]["conversationId"]) if thread_idx >= 0 and thread_idx < len(search_results) else None,
inputs=lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index,
outputs=export_output
)
# Launch the app
if __name__ == "__main__":
demo.launch()