Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import json | |
| import pandas as pd | |
| import numpy as np | |
| import datetime | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import msal | |
| import requests | |
| import tqdm | |
| import tempfile | |
| import time | |
| from typing import List, Dict, Any, Tuple, Optional | |
| # Configuration | |
| MS_CLIENT_ID = os.getenv("MS_CLIENT_ID", "ff0d5b77-56a9-4fa0-bd59-5c7b4889186e") | |
| MS_TENANT_ID = os.getenv("MS_TENANT_ID", "677c00b7-cf19-4fef-9962-132a076ae325") | |
| MS_AUTHORITY = f"https://login.microsoftonline.com/{MS_TENANT_ID}" | |
| MS_REDIRECT_URI = os.getenv("MS_REDIRECT_URI", "https://huggingface.co/spaces/YOUR-USERNAME/email-thread-analyzer/") | |
| # Microsoft Graph API scopes | |
| SCOPES = [ | |
| "User.Read", | |
| "Mail.Read", | |
| "Mail.ReadBasic", | |
| ] | |
| # Global variables | |
| auth_app = None | |
| current_user = None | |
| user_token = None | |
| emails = [] | |
| email_threads = {} | |
| search_results = [] | |
| qa_data = {} | |
| # Initialize MSAL app | |
| def init_auth_app(): | |
| global auth_app | |
| auth_app = msal.PublicClientApplication( | |
| client_id=MS_CLIENT_ID, | |
| authority=MS_AUTHORITY | |
| ) | |
| # Get authorization URL | |
| def get_auth_url(): | |
| auth_url = auth_app.get_authorization_request_url( | |
| scopes=SCOPES, | |
| redirect_uri=MS_REDIRECT_URI, | |
| state="state" | |
| ) | |
| return auth_url | |
| # Process auth code | |
| def process_auth_code(auth_code): | |
| global current_user, user_token | |
| try: | |
| # Acquire token | |
| token_response = auth_app.acquire_token_by_authorization_code( | |
| code=auth_code, | |
| scopes=SCOPES, | |
| redirect_uri=MS_REDIRECT_URI | |
| ) | |
| if "error" in token_response: | |
| return f"Error: {token_response['error_description']}" | |
| # Store token | |
| user_token = token_response | |
| # Get user info | |
| user_response = requests.get( | |
| "https://graph.microsoft.com/v1.0/me", | |
| headers={"Authorization": f"Bearer {user_token['access_token']}"} | |
| ) | |
| if user_response.status_code == 200: | |
| current_user = user_response.json() | |
| return f"Successfully authenticated as {current_user['displayName']}" | |
| else: | |
| return f"Error getting user info: {user_response.text}" | |
| except Exception as e: | |
| return f"Error during authentication: {str(e)}" | |
| # Get mail folders | |
| def get_mail_folders(): | |
| if not user_token: | |
| return [], "Not authenticated" | |
| try: | |
| response = requests.get( | |
| "https://graph.microsoft.com/v1.0/me/mailFolders", | |
| headers={"Authorization": f"Bearer {user_token['access_token']}"} | |
| ) | |
| if response.status_code == 200: | |
| folders = response.json()["value"] | |
| return [(folder["displayName"], folder["id"]) for folder in folders], None | |
| else: | |
| return [], f"Error: {response.text}" | |
| except Exception as e: | |
| return [], f"Error: {str(e)}" | |
| # Extract emails from folder | |
| def extract_emails(folder_id, max_emails=100, batch_size=25, start_date=None, end_date=None): | |
| global emails, email_threads | |
| if not user_token: | |
| return "Not authenticated" | |
| try: | |
| # Reset data | |
| emails = [] | |
| email_threads = {} | |
| # Prepare filter | |
| filter_query = "" | |
| if start_date and end_date: | |
| start_date_iso = datetime.datetime.strptime(start_date, "%Y-%m-%d").isoformat() + "Z" | |
| end_date_iso = datetime.datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "Z" | |
| filter_query = f"receivedDateTime ge {start_date_iso} and receivedDateTime le {end_date_iso}" | |
| # Extract emails in batches | |
| for i in range(0, max_emails, batch_size): | |
| # Prepare request | |
| url = f"https://graph.microsoft.com/v1.0/me/mailFolders/{folder_id}/messages" | |
| headers = {"Authorization": f"Bearer {user_token['access_token']}"} | |
| params = { | |
| "$select": "id,subject,sender,from,toRecipients,ccRecipients,receivedDateTime,conversationId,bodyPreview,uniqueBody", | |
| "$top": batch_size, | |
| "$skip": i | |
| } | |
| if filter_query: | |
| params["$filter"] = filter_query | |
| # Make request | |
| response = requests.get(url, headers=headers, params=params) | |
| if response.status_code != 200: | |
| return f"Error: {response.text}" | |
| batch_emails = response.json()["value"] | |
| if not batch_emails: | |
| break | |
| emails.extend(batch_emails) | |
| if len(emails) >= max_emails: | |
| emails = emails[:max_emails] | |
| break | |
| # Organize emails into threads | |
| organize_email_threads() | |
| return f"Successfully extracted {len(emails)} emails organized into {len(email_threads)} threads" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # Organize emails into threads | |
| def organize_email_threads(): | |
| global email_threads | |
| threads = {} | |
| for email in emails: | |
| conversation_id = email["conversationId"] | |
| if conversation_id not in threads: | |
| threads[conversation_id] = [] | |
| threads[conversation_id].append(email) | |
| # Sort emails within each thread by date | |
| for thread_id, thread_emails in threads.items(): | |
| thread_emails.sort(key=lambda x: x["receivedDateTime"]) | |
| # Extract thread metadata | |
| threads[thread_id] = { | |
| "emails": thread_emails, | |
| "subject": thread_emails[0]["subject"], | |
| "start_date": thread_emails[0]["receivedDateTime"], | |
| "end_date": thread_emails[-1]["receivedDateTime"], | |
| "message_count": len(thread_emails), | |
| "participants": get_unique_participants(thread_emails) | |
| } | |
| email_threads = threads | |
| # Get unique participants | |
| def get_unique_participants(thread_emails): | |
| participants = set() | |
| for email in thread_emails: | |
| # Add sender | |
| if "sender" in email and "emailAddress" in email["sender"]: | |
| participants.add(email["sender"]["emailAddress"]["address"]) | |
| # Add recipients | |
| if "toRecipients" in email: | |
| for recipient in email["toRecipients"]: | |
| participants.add(recipient["emailAddress"]["address"]) | |
| # Add CC recipients | |
| if "ccRecipients" in email: | |
| for recipient in email["ccRecipients"]: | |
| participants.add(recipient["emailAddress"]["address"]) | |
| return list(participants) | |
| # Search threads using simple keyword matching | |
| def search_threads(query): | |
| global search_results | |
| if not query or not email_threads: | |
| search_results = [] | |
| return "Please enter a search query and ensure emails have been extracted" | |
| try: | |
| # Search terms | |
| search_terms = query.lower().split() | |
| # Calculate relevance scores | |
| results = [] | |
| for thread_id, thread in email_threads.items(): | |
| # Prepare text content from thread | |
| content = f"{thread['subject'].lower()} " | |
| for email in thread["emails"]: | |
| content += f"{email['bodyPreview'].lower()} " | |
| # Calculate score based on term frequency | |
| score = 0 | |
| for term in search_terms: | |
| score += content.count(term) | |
| if score > 0: | |
| results.append((thread, score)) | |
| # Sort by score | |
| results.sort(key=lambda x: x[1], reverse=True) | |
| search_results = [thread for thread, _ in results] | |
| if not search_results: | |
| return "No relevant threads found" | |
| return f"Found {len(search_results)} relevant threads" | |
| except Exception as e: | |
| search_results = [] | |
| return f"Error: {str(e)}" | |
| # Generate Q&A for thread | |
| def generate_qa(thread_id): | |
| if thread_id not in email_threads: | |
| return "Thread not found" | |
| try: | |
| thread = email_threads[thread_id] | |
| # Create thread context | |
| context = f"Thread subject: {thread['subject']}\n\n" | |
| for email in thread["emails"]: | |
| sender = email["sender"]["emailAddress"]["address"] | |
| content += f"From: {sender}\n" | |
| content += f"Date: {email['receivedDateTime']}\n" | |
| content += f"Content: {email['bodyPreview']}\n\n" | |
| # Generate sample questions | |
| questions = [ | |
| f"What is the main topic of this email thread about '{thread['subject']}'?", | |
| "Who are the key participants in this conversation?", | |
| "What was the timeline of this discussion?", | |
| "What were the main points discussed in this thread?" | |
| ] | |
| # Generate simple answers (simulating AI responses) | |
| answers = [ | |
| f"The main topic appears to be '{thread['subject']}', which discusses project-related matters.", | |
| f"The key participants include {', '.join(thread['participants'][:3])}" + | |
| (f" and {len(thread['participants']) - 3} others" if len(thread['participants']) > 3 else ""), | |
| f"The conversation started on {thread['start_date'].split('T')[0]} and the last message was on {thread['end_date'].split('T')[0]}.", | |
| "The main points include updates on project status, discussion of requirements, and next steps." | |
| ] | |
| # Create summary | |
| summary = f"This is an email thread with {thread['message_count']} messages about '{thread['subject']}'. " | |
| summary += f"The conversation started on {thread['start_date'].split('T')[0]} and ended on {thread['end_date'].split('T')[0]}. " | |
| summary += f"There are {len(thread['participants'])} participants in this thread." | |
| # Store Q&A data | |
| qa_data[thread_id] = { | |
| "questions": questions, | |
| "answers": answers, | |
| "summary": summary | |
| } | |
| return f"Generated {len(questions)} Q&A pairs for thread" | |
| except Exception as e: | |
| return f"Error generating Q&A: {str(e)}" | |
| # Get thread size distribution | |
| def get_thread_size_distribution(): | |
| if not email_threads: | |
| return None | |
| # Count threads by size | |
| sizes = {} | |
| for thread in email_threads.values(): | |
| size = thread["message_count"] | |
| if size in sizes: | |
| sizes[size] += 1 | |
| else: | |
| sizes[size] = 1 | |
| # Convert to dataframe | |
| df = pd.DataFrame([ | |
| {"Size": size, "Count": count} | |
| for size, count in sizes.items() | |
| ]) | |
| # Sort by size | |
| df = df.sort_values("Size") | |
| # Create chart | |
| fig = px.bar(df, x="Size", y="Count", title="Thread Size Distribution") | |
| return fig | |
| # Get activity over time | |
| def get_activity_over_time(): | |
| if not emails: | |
| return None | |
| # Count emails by date | |
| dates = {} | |
| for email in emails: | |
| date = email["receivedDateTime"].split("T")[0] | |
| if date in dates: | |
| dates[date] += 1 | |
| else: | |
| dates[date] = 1 | |
| # Convert to dataframe | |
| df = pd.DataFrame([ | |
| {"Date": date, "Count": count} | |
| for date, count in dates.items() | |
| ]) | |
| # Sort by date | |
| df = df.sort_values("Date") | |
| # Create chart | |
| fig = px.line(df, x="Date", y="Count", title="Activity Over Time") | |
| return fig | |
| # Get participant activity | |
| def get_participant_activity(): | |
| if not emails: | |
| return None | |
| # Count emails by sender | |
| senders = {} | |
| for email in emails: | |
| if "sender" in email and "emailAddress" in email["sender"]: | |
| sender = email["sender"]["emailAddress"]["address"] | |
| if sender in senders: | |
| senders[sender] += 1 | |
| else: | |
| senders[sender] = 1 | |
| # Convert to dataframe | |
| df = pd.DataFrame([ | |
| {"Participant": sender, "Count": count} | |
| for sender, count in senders.items() | |
| ]) | |
| # Sort by count | |
| df = df.sort_values("Count", ascending=False).head(10) | |
| # Create chart | |
| fig = px.bar(df, x="Count", y="Participant", title="Top 10 Participants", orientation='h') | |
| return fig | |
| # Export thread data with Q&A | |
| def export_thread_data(thread_id): | |
| if thread_id not in email_threads: | |
| return None | |
| thread = email_threads[thread_id] | |
| qa = qa_data.get(thread_id, {"questions": [], "answers": [], "summary": ""}) | |
| export_data = { | |
| "subject": thread["subject"], | |
| "start_date": thread["start_date"], | |
| "end_date": thread["end_date"], | |
| "message_count": thread["message_count"], | |
| "participants": thread["participants"], | |
| "emails": [ | |
| { | |
| "sender": email["sender"]["emailAddress"]["address"], | |
| "received_date_time": email["receivedDateTime"], | |
| "subject": email["subject"], | |
| "body_preview": email["bodyPreview"] | |
| } | |
| for email in thread["emails"] | |
| ], | |
| "qa": qa | |
| } | |
| # Save to temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.json', mode='w') as f: | |
| json.dump(export_data, f, indent=2) | |
| return f.name | |
| # Initialize | |
| init_auth_app() | |
| # Create the Gradio interface | |
| with gr.Blocks(title="Email Thread Analyzer with AI Q&A") as demo: | |
| gr.Markdown("# Email Thread Analyzer with AI Q&A") | |
| # Authentication section | |
| with gr.Tab("Authentication"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("## Sign in with Microsoft") | |
| gr.Markdown("1. Click 'Get Authentication URL' to start the sign-in process") | |
| gr.Markdown("2. Copy the authorization code from the redirect URL") | |
| gr.Markdown("3. Paste the code below and submit") | |
| with gr.Column(scale=3): | |
| auth_url_button = gr.Button("Get Authentication URL") | |
| auth_url_output = gr.Textbox(label="Authentication URL", interactive=False) | |
| auth_code_input = gr.Textbox(label="Authorization Code") | |
| auth_submit = gr.Button("Submit Authorization Code") | |
| auth_status = gr.Textbox(label="Authentication Status", interactive=False) | |
| # Email Extraction section | |
| with gr.Tab("Email Extraction"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| folder_dropdown = gr.Dropdown(label="Select Mail Folder") | |
| refresh_folders_button = gr.Button("Refresh Folders") | |
| with gr.Row(): | |
| max_emails_input = gr.Number(label="Max Emails", value=100, minimum=1, maximum=1000) | |
| batch_size_input = gr.Number(label="Batch Size", value=25, minimum=1, maximum=100) | |
| with gr.Row(): | |
| start_date_input = gr.Textbox(label="Start Date (YYYY-MM-DD)") | |
| end_date_input = gr.Textbox(label="End Date (YYYY-MM-DD)") | |
| extract_button = gr.Button("Extract Emails") | |
| extraction_status = gr.Textbox(label="Extraction Status", interactive=False) | |
| # Thread Analysis section | |
| with gr.Tab("Thread Analysis"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| analysis_status = gr.Textbox(label="Analysis Status") | |
| with gr.Tabs(): | |
| with gr.Tab("Thread Size"): | |
| thread_size_plot = gr.Plot(label="Thread Size Distribution") | |
| with gr.Tab("Activity Over Time"): | |
| activity_plot = gr.Plot(label="Activity Over Time") | |
| with gr.Tab("Top Participants"): | |
| participants_plot = gr.Plot(label="Top Participants") | |
| generate_analytics_button = gr.Button("Generate Analytics") | |
| # Search section | |
| with gr.Tab("Search"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| search_input = gr.Textbox(label="Search Query") | |
| search_button = gr.Button("Search") | |
| search_status = gr.Textbox(label="Search Status", interactive=False) | |
| with gr.Column(): | |
| search_results_dropdown = gr.Dropdown(label="Search Results") | |
| view_thread_button = gr.Button("View Thread") | |
| # Q&A section | |
| with gr.Tab("Q&A"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| thread_info = gr.Textbox(label="Thread Information", interactive=False) | |
| qa_status = gr.Textbox(label="Q&A Status", interactive=False) | |
| with gr.Accordion("Thread Content", open=False): | |
| thread_content = gr.Textbox(label="Thread Content", interactive=False, lines=10) | |
| with gr.Row(): | |
| question_dropdown = gr.Dropdown(label="Questions") | |
| gen_qa_button = gr.Button("Generate Q&A") | |
| answer_output = gr.Textbox(label="Answer", interactive=False, lines=5) | |
| summary_output = gr.Textbox(label="Summary", interactive=False, lines=5) | |
| export_thread_button = gr.Button("Export Thread Data") | |
| export_output = gr.File(label="Export Data") | |
| # Set up event handlers | |
| # Authentication events | |
| auth_url_button.click( | |
| fn=get_auth_url, | |
| outputs=auth_url_output | |
| ) | |
| auth_submit.click( | |
| fn=process_auth_code, | |
| inputs=auth_code_input, | |
| outputs=auth_status | |
| ) | |
| # Folder refresh event | |
| refresh_folders_button.click( | |
| fn=lambda: get_mail_folders()[0], | |
| outputs=folder_dropdown | |
| ) | |
| # Email extraction event | |
| extract_button.click( | |
| fn=extract_emails, | |
| inputs=[folder_dropdown, max_emails_input, batch_size_input, start_date_input, end_date_input], | |
| outputs=extraction_status | |
| ) | |
| # Analytics generation event | |
| generate_analytics_button.click( | |
| fn=lambda: ( | |
| "Analytics generated successfully", | |
| get_thread_size_distribution(), | |
| get_activity_over_time(), | |
| get_participant_activity() | |
| ), | |
| outputs=[analysis_status, thread_size_plot, activity_plot, participants_plot] | |
| ) | |
| # Search events | |
| search_button.click( | |
| fn=lambda query: ( | |
| search_threads(query), | |
| [f"{thread['subject']} ({thread['message_count']} messages)" for thread in search_results] | |
| ), | |
| inputs=search_input, | |
| outputs=[search_status, search_results_dropdown] | |
| ) | |
| # Thread view event | |
| def view_thread_details(thread_idx): | |
| if not search_results or thread_idx < 0 or thread_idx >= len(search_results): | |
| return "No thread selected", "", [], "", "", None | |
| thread = search_results[thread_idx] | |
| thread_id = thread["emails"][0]["conversationId"] | |
| # Generate thread content | |
| content = f"Subject: {thread['subject']}\n\n" | |
| for email in thread["emails"]: | |
| sender = email["sender"]["emailAddress"]["address"] | |
| date = email["receivedDateTime"] | |
| content += f"From: {sender} | Date: {date}\n" | |
| content += f"Content: {email['bodyPreview']}\n\n" | |
| # Generate Q&A if not already generated | |
| qa_result = "Q&A already generated" | |
| if thread_id not in qa_data: | |
| qa_result = generate_qa(thread_id) | |
| # Get questions, answer, summary | |
| questions = qa_data.get(thread_id, {}).get("questions", []) | |
| answer = qa_data.get(thread_id, {}).get("answers", [""])[0] if questions else "" | |
| summary = qa_data.get(thread_id, {}).get("summary", "") | |
| # Export data | |
| export_data = export_thread_data(thread_id) | |
| return f"Thread: {thread['subject']} ({thread['message_count']} messages)", content, questions, answer, summary, export_data | |
| view_thread_button.click( | |
| fn=lambda: view_thread_details(0 if not search_results_dropdown.value else search_results_dropdown.index), | |
| outputs=[thread_info, thread_content, question_dropdown, answer_output, summary_output, export_output] | |
| ) | |
| # Q&A events | |
| question_dropdown.change( | |
| fn=lambda q, thread_idx: qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("answers", [""])[qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []).index(q)] if q and thread_idx >= 0 and thread_idx < len(search_results) and search_results[thread_idx]["emails"][0]["conversationId"] in qa_data and q in qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) else "", | |
| inputs=[question_dropdown, lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index], | |
| outputs=answer_output | |
| ) | |
| gen_qa_button.click( | |
| fn=lambda thread_idx: ( | |
| generate_qa(search_results[thread_idx]["emails"][0]["conversationId"]) if thread_idx >= 0 and thread_idx < len(search_results) else "No thread selected", | |
| qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) if thread_idx >= 0 and thread_idx < len(search_results) else [], | |
| qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("answers", [""])[0] if thread_idx >= 0 and thread_idx < len(search_results) and search_results[thread_idx]["emails"][0]["conversationId"] in qa_data and qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("questions", []) else "", | |
| qa_data.get(search_results[thread_idx]["emails"][0]["conversationId"], {}).get("summary", "") if thread_idx >= 0 and thread_idx < len(search_results) else "" | |
| ), | |
| inputs=lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index, | |
| outputs=[qa_status, question_dropdown, answer_output, summary_output] | |
| ) | |
| # Export event | |
| export_thread_button.click( | |
| fn=lambda thread_idx: export_thread_data(search_results[thread_idx]["emails"][0]["conversationId"]) if thread_idx >= 0 and thread_idx < len(search_results) else None, | |
| inputs=lambda: 0 if not search_results_dropdown.value else search_results_dropdown.index, | |
| outputs=export_output | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() |