Spaces:
Running
Running
import os | |
import gradio as gr | |
from transformers import pipeline | |
import spacy | |
import lib.read_pdf | |
import pandas as pd | |
import re | |
import matplotlib.pyplot as plt | |
import matplotlib.patches as patches | |
import io | |
# Initialize spaCy model | |
nlp = spacy.load('en_core_web_sm') | |
nlp.add_pipe('sentencizer') | |
def split_in_sentences(text): | |
doc = nlp(text) | |
return [str(sent).strip() for sent in doc.sents] | |
def make_spans(text, results): | |
results_list = [res['label'] for res in results] | |
facts_spans = list(zip(split_in_sentences(text), results_list)) | |
return facts_spans | |
# Initialize pipelines | |
summarizer = pipeline("summarization", model="human-centered-summarization/financial-summarization-pegasus") | |
fin_model = pipeline("sentiment-analysis", model='yiyanghkust/finbert-tone', tokenizer='yiyanghkust/finbert-tone') | |
fin_model_bis = pipeline("sentiment-analysis", model='ProsusAI/finbert', tokenizer='ProsusAI/finbert') | |
def summarize_text(text): | |
resp = summarizer(text) | |
return resp[0]['summary_text'] | |
def text_to_sentiment(text): | |
sentiment = fin_model(text)[0]["label"] | |
return sentiment | |
def fin_ext(text): | |
results = fin_model(split_in_sentences(text)) | |
return make_spans(text, results) | |
def fin_ext_bis(text): | |
results = fin_model_bis(split_in_sentences(text)) | |
return make_spans(text, results) | |
def extract_and_summarize(pdf1, pdf2): | |
if not pdf1 or not pdf2: | |
return [], [] | |
pdf1_path = os.path.join(PDF_FOLDER, pdf1) | |
pdf2_path = os.path.join(PDF_FOLDER, pdf2) | |
# Extract and format paragraphs | |
paragraphs_1 = lib.read_pdf.extract_and_format_paragraphs(pdf1_path) | |
paragraphs_2 = lib.read_pdf.extract_and_format_paragraphs(pdf2_path) | |
start_keyword = "Main risks to" | |
end_keywords = ["4. Appendix", "Annex:", "4. Annex", "Detailed tables", "ACKNOWLEDGEMENTS", "STATISTICAL ANNEX", "PROSPECTS BY MEMBER STATES"] | |
start_index1, end_index1 = lib.read_pdf.find_text_range(paragraphs_1, start_keyword, end_keywords) | |
start_index2, end_index2 = lib.read_pdf.find_text_range(paragraphs_2, start_keyword, end_keywords) | |
paragraphs_1 = lib.read_pdf.extract_relevant_text(paragraphs_1, start_index1, end_index1) | |
paragraphs_2 = lib.read_pdf.extract_relevant_text(paragraphs_2, start_index2, end_index2) | |
paragraphs_1 = lib.read_pdf.split_text_into_paragraphs(paragraphs_1, 0) | |
paragraphs_2 = lib.read_pdf.split_text_into_paragraphs(paragraphs_2, 0) | |
return paragraphs_1, paragraphs_2 | |
# Gradio interface setup | |
PDF_FOLDER = "data" | |
def get_pdf_files(folder): | |
return [f for f in os.listdir(folder) if f.endswith('.pdf')] | |
def show(name): | |
return f"{name}" | |
def get_excel_files(folder): | |
return [f for f in os.listdir(folder) if f.endswith('.xlsx')] | |
def get_sheet_names(file): | |
xls = pd.ExcelFile(os.path.join(PDF_FOLDER, file)) | |
return xls.sheet_names | |
def process_and_compare(file1, sheet1, file2, sheet2): | |
def process_file(file_path, sheet_name): | |
# Extract year from file name | |
year = int(re.search(r'(\d{4})', file_path).group(1)) | |
# Load the Excel file | |
df = pd.read_excel(os.path.join(PDF_FOLDER, file_path), sheet_name=sheet_name, index_col=0) | |
# Define expected columns based on extracted year | |
historical_col = f'Historical {year - 1}' | |
baseline_cols = [f'Baseline {year}', f'Baseline {year + 1}', f'Baseline {year + 2}'] | |
adverse_cols = [f'Adverse {year}', f'Adverse {year + 1}', f'Adverse {year + 2}'] | |
level_deviation_col = f'Level Deviation {year + 2}' | |
# Drop rows and reset index | |
df = df.iloc[4:].reset_index(drop=True) | |
# Define the new column names | |
new_columns = ['Country', 'Code', historical_col] + baseline_cols + adverse_cols + ['Adverse Cumulative', 'Adverse Minimum', level_deviation_col] | |
# Ensure the number of columns matches | |
if len(df.columns) == len(new_columns): | |
df.columns = new_columns | |
else: | |
raise ValueError(f"Expected {len(new_columns)} columns, but found {len(df.columns)} columns in the data.") | |
return df | |
# Process both files | |
df1 = process_file(file1, sheet1) | |
df2 = process_file(file2, sheet2) | |
year1 = int(re.search(r'(\d{4})', file1).group(1)) | |
year2 = int(re.search(r'(\d{4})', file2).group(1)) | |
# Calculate the differences | |
# historical_col1 = f'Historical {int(year1) - 1}' | |
# historical_col2 = f'Historical {int(year2) - 1}' | |
# df1['Historical vs Adverse'] = df1[historical_col1] - df1['Adverse Cumulative'] | |
# df2['Historical vs Adverse'] = df2[historical_col2] - df2['Adverse Cumulative'] | |
# Merge dataframes on 'Country' | |
merged_df = pd.merge(df2, df1, on='Country', suffixes=(f'_{year1}', f'_{year2}')) | |
merged_df['Difference adverse cumulative growth'] = merged_df[f'Adverse Cumulative_{year2}'] - merged_df[f'Adverse Cumulative_{year1}'] | |
# Ensure data types are correct | |
merged_df['Country'] = merged_df['Country'].astype(str) | |
merged_df['Difference adverse cumulative growth'] = pd.to_numeric(merged_df['Difference adverse cumulative growth'], errors='coerce') | |
# Create histogram plot with color coding | |
fig, ax = plt.subplots(figsize=(12, 8)) | |
colors = plt.get_cmap('tab20').colors # Use a colormap with multiple colors | |
num_countries = len(merged_df['Country']) | |
bars = ax.bar(merged_df['Country'], merged_df['Difference adverse cumulative growth'], color=colors[:num_countries]) | |
# Add a legend | |
handles = [patches.Patch(color=color, label=country) for color, country in zip(colors[:num_countries], merged_df['Country'])] | |
ax.legend(handles=handles, title='Countries', bbox_to_anchor=(1.05, 1), loc='upper left') | |
ax.set_title(f'Histogram of Difference between Adverse cumulative growth of {year2} and {year1} for {sheet1}') | |
ax.set_xlabel('Country') | |
ax.set_ylabel('Difference') | |
plt.xticks(rotation=90) | |
# Save plot to BytesIO object | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', bbox_inches='tight') | |
buf.seek(0) | |
img = buf.getvalue() | |
buf.close() | |
return img | |
stored_paragraphs_1 = [] | |
stored_paragraphs_2 = [] | |
with gr.Blocks() as demo: | |
with gr.Tab("Financial Report Text Analysis"): | |
gr.Markdown("## Financial Report Paragraph Selection and Analysis on adverse macro-economy scenario") | |
with gr.Row(): | |
# Upload PDFs | |
with gr.Column(): | |
pdf1 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 1") | |
pdf2 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 2") | |
with gr.Column(): | |
b1 = gr.Button("Extract and Display Paragraphs") | |
paragraph_1_dropdown = gr.Dropdown(label="Select Paragraph from PDF 1") | |
paragraph_2_dropdown = gr.Dropdown(label="Select Paragraph from PDF 2") | |
def update_paragraphs(pdf1, pdf2): | |
global stored_paragraphs_1, stored_paragraphs_2 | |
stored_paragraphs_1, stored_paragraphs_2 = extract_and_summarize(pdf1, pdf2) | |
updated_dropdown_1 = [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_1)] | |
updated_dropdown_2 = [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_2)] | |
return gr.update(choices=updated_dropdown_1), gr.update(choices=updated_dropdown_2) | |
b1.click(fn=update_paragraphs, inputs=[pdf1, pdf2], outputs=[paragraph_1_dropdown, paragraph_2_dropdown]) | |
with gr.Row(): | |
# Process the selected paragraph from PDF 1 | |
with gr.Column(): | |
gr.Markdown("### PDF 1 Analysis") | |
selected_paragraph_1 = gr.Textbox(label="Selected Paragraph 1 Content", lines=4) | |
selected_paragraph_1.change(show, paragraph_1_dropdown, selected_paragraph_1) | |
summarize_btn1 = gr.Button("Summarize Text from PDF 1") | |
summary_textbox_1 = gr.Textbox(label="Summary for PDF 1", lines=2) | |
summarize_btn1.click(fn=lambda p: process_paragraph_1_sum(p), inputs=paragraph_1_dropdown, outputs=summary_textbox_1) | |
sentiment_btn1 = gr.Button("Classify Financial Tone from PDF 1") | |
sentiment_textbox_1 = gr.Textbox(label="Classification for PDF 1", lines=1) | |
sentiment_btn1.click(fn=lambda p: process_paragraph_1_sent(p), inputs=paragraph_1_dropdown, outputs=sentiment_textbox_1) | |
analyze_btn1 = gr.Button("Analyze Financial Tone on each sentence with yiyanghkust/finbert-tone") | |
fin_spans_1 = gr.HighlightedText(label="Financial Tone Analysis for PDF 1") | |
analyze_btn1.click(fn=lambda p: process_paragraph_1_sent_tone(p), inputs=paragraph_1_dropdown, outputs=fin_spans_1) | |
analyze_btn1_ = gr.Button("Analyze Financial Tone on each sentence with ProsusAI/finbert") | |
fin_spans_1_ = gr.HighlightedText(label="Financial Tone Analysis for PDF 1 bis") | |
analyze_btn1_.click(fn=lambda p: process_paragraph_1_sent_tone_bis(p), inputs=paragraph_1_dropdown, outputs=fin_spans_1_) | |
# Process the selected paragraph from PDF 2 | |
with gr.Column(): | |
gr.Markdown("### PDF 2 Analysis") | |
selected_paragraph_2 = gr.Textbox(label="Selected Paragraph 2 Content", lines=4) | |
selected_paragraph_2.change(show, paragraph_2_dropdown, selected_paragraph_2) | |
summarize_btn2 = gr.Button("Summarize Text from PDF 2") | |
summary_textbox_2 = gr.Textbox(label="Summary for PDF 2", lines=2) | |
summarize_btn2.click(fn=lambda p: process_paragraph_2_sum(p), inputs=paragraph_2_dropdown, outputs=summary_textbox_2) | |
sentiment_btn2 = gr.Button("Classify Financial Tone from PDF 2") | |
sentiment_textbox_2 = gr.Textbox(label="Classification for PDF 2", lines=1) | |
sentiment_btn2.click(fn=lambda p: process_paragraph_2_sent(p), inputs=paragraph_2_dropdown, outputs=sentiment_textbox_2) | |
analyze_btn2 = gr.Button("Analyze Financial Tone on each sentence with yiyanghkust/finbert-tone") | |
fin_spans_2 = gr.HighlightedText(label="Financial Tone Analysis for PDF 2") | |
analyze_btn2.click(fn=lambda p: process_paragraph_2_sent_tone(p), inputs=paragraph_2_dropdown, outputs=fin_spans_2) | |
analyze_btn2_ = gr.Button("Analyze Financial Tone on each sentence with ProsusAI/finbert") | |
fin_spans_2_ = gr.HighlightedText(label="Financial Tone Analysis for PDF 2 bis") | |
analyze_btn2_.click(fn=lambda p: process_paragraph_2_sent_tone_bis(p), inputs=paragraph_2_dropdown, outputs=fin_spans_2_) | |
with gr.Tab("Financial Report Table Analysis"): | |
# New tab content goes here | |
gr.Markdown("## Excel Data Comparison") | |
with gr.Row(): | |
with gr.Column(): | |
file1 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 1") | |
file2 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 2") | |
sheet = gr.Dropdown(choices=[], label="Select Sheet for File 1 and 2") | |
with gr.Column(): | |
result = gr.Image(label="Comparison pLot") | |
def update_sheets(file): | |
return get_sheet_names(file) | |
file1.change(fn=update_sheets, inputs=file1, outputs=sheet) | |
file2.change(fn=update_sheets, inputs=file2, outputs=sheet) | |
b1 = gr.Button("Compare Data") | |
b1.click(fn=process_and_compare, inputs=[file1, sheet, file2, sheet], outputs=result) | |
demo.launch() | |