Spaces:
Running
Running
import os | |
import gradio as gr | |
from transformers import pipeline | |
import spacy | |
import lib.read_pdf | |
import pandas as pd | |
import re | |
import matplotlib.pyplot as plt | |
import matplotlib.patches as patches | |
import io | |
# Initialize spaCy model | |
nlp = spacy.load('en_core_web_sm') | |
nlp.add_pipe('sentencizer') | |
def split_in_sentences(text): | |
doc = nlp(text) | |
return [str(sent).strip() for sent in doc.sents] | |
def make_spans(text, results): | |
results_list = [res['label'] for res in results] | |
facts_spans = list(zip(split_in_sentences(text), results_list)) | |
return facts_spans | |
# Initialize pipelines | |
summarizer = pipeline("summarization", model="human-centered-summarization/financial-summarization-pegasus") | |
fin_model = pipeline("sentiment-analysis", model='yiyanghkust/finbert-tone', tokenizer='yiyanghkust/finbert-tone') | |
fin_model_bis = pipeline("sentiment-analysis", model='ProsusAI/finbert', tokenizer='ProsusAI/finbert') | |
table_to_text = pipeline('text2text-generation', model='google/flan-t5-large') | |
def summarize_text(text): | |
resp = summarizer(text) | |
return resp[0]['summary_text'] | |
def text_to_sentiment(text): | |
sentiment = fin_model(text)[0]["label"] | |
return sentiment | |
def fin_ext(text): | |
results = fin_model(split_in_sentences(text)) | |
return make_spans(text, results) | |
def fin_ext_bis(text): | |
results = fin_model_bis(split_in_sentences(text)) | |
return make_spans(text, results) | |
def extract_and_paragraph(pdf1, pdf2, paragraph): | |
if not pdf1 or not pdf2: | |
return [], [] | |
pdf1_path = os.path.join(PDF_FOLDER, pdf1) | |
pdf2_path = os.path.join(PDF_FOLDER, pdf2) | |
# Extract and format paragraphs | |
paragraphs_1 = lib.read_pdf.extract_and_format_paragraphs(pdf1_path) | |
paragraphs_2 = lib.read_pdf.extract_and_format_paragraphs(pdf2_path) | |
start_keyword = "Main risks to" | |
end_keywords = ["4. Appendix", "Annex:", "4. Annex", "Detailed tables", "ACKNOWLEDGEMENTS", "STATISTICAL ANNEX", "PROSPECTS BY MEMBER STATES"] | |
start_index1, end_index1 = lib.read_pdf.find_text_range(paragraphs_1, start_keyword, end_keywords) | |
start_index2, end_index2 = lib.read_pdf.find_text_range(paragraphs_2, start_keyword, end_keywords) | |
paragraphs_1 = lib.read_pdf.extract_relevant_text(paragraphs_1, start_index1, end_index1) | |
paragraphs_2 = lib.read_pdf.extract_relevant_text(paragraphs_2, start_index2, end_index2) | |
if paragraph: | |
paragraphs_1 = lib.read_pdf.split_text_into_paragraphs(paragraphs_1, 0) | |
paragraphs_2 = lib.read_pdf.split_text_into_paragraphs(paragraphs_2, 0) | |
return paragraphs_1, paragraphs_2 | |
# Gradio interface setup | |
PDF_FOLDER = "data" | |
def get_pdf_files(folder): | |
return [f for f in os.listdir(folder) if f.endswith('.pdf')] | |
def show(name): | |
return f"{name}" | |
def get_excel_files(folder): | |
return [f for f in os.listdir(folder) if f.endswith('.xlsx')] | |
def get_sheet_names(file): | |
xls = pd.ExcelFile(os.path.join(PDF_FOLDER, file)) | |
return gr.update(choices=xls.sheet_names) | |
def process_and_compare(file1, sheet1, file2, sheet2): | |
def process_file(file_path, sheet_name): | |
# Extract year from file name | |
year = int(re.search(r'(\d{4})', file_path).group(1)) | |
# Load the Excel file | |
df = pd.read_excel(os.path.join(PDF_FOLDER, file_path), sheet_name=sheet_name, index_col=0) | |
# Define expected columns based on extracted year | |
historical_col = f'Historical {year - 1}' | |
baseline_cols = [f'Baseline {year}', f'Baseline {year + 1}', f'Baseline {year + 2}'] | |
adverse_cols = [f'Adverse {year}', f'Adverse {year + 1}', f'Adverse {year + 2}'] | |
level_deviation_col = f'Level Deviation {year + 2}' | |
# Drop rows and reset index | |
df = df.iloc[4:].reset_index(drop=True) | |
# Define the new column names | |
new_columns = ['Country', 'Code', historical_col] + baseline_cols + adverse_cols + ['Adverse Cumulative', 'Adverse Minimum', level_deviation_col] | |
# Ensure the number of columns matches | |
if len(df.columns) == len(new_columns): | |
df.columns = new_columns | |
else: | |
raise ValueError(f"Expected {len(new_columns)} columns, but found {len(df.columns)} columns in the data.") | |
columns = ['Country', f'Adverse {year}', f'Adverse {year+1}', f'Adverse {year+2}', 'Adverse Cumulative'] | |
return df, df[columns] | |
# Process both files | |
global stored_df1, stored_df2 | |
df1, stored_df1 = process_file(file1, sheet1) | |
df2, stored_df2 = process_file(file2, sheet2) | |
year1 = int(re.search(r'(\d{4})', file1).group(1)) | |
year2 = int(re.search(r'(\d{4})', file2).group(1)) | |
# Merge dataframes on 'Country' | |
merged_df = pd.merge(df2, df1, on='Country', suffixes=(f'_{year1}', f'_{year2}')) | |
merged_df['Difference adverse cumulative growth'] = merged_df[f'Adverse Cumulative_{year2}'] - merged_df[f'Adverse Cumulative_{year1}'] | |
# Ensure data types are correct | |
merged_df['Country'] = merged_df['Country'].astype(str) | |
merged_df['Difference adverse cumulative growth'] = pd.to_numeric(merged_df['Difference adverse cumulative growth'], errors='coerce') | |
# Create histogram plot with color coding | |
fig, ax = plt.subplots(figsize=(12, 8)) | |
colors = plt.get_cmap('tab20').colors # Use a colormap with multiple colors | |
num_countries = len(merged_df['Country']) | |
bars = ax.bar(merged_df['Country'], merged_df['Difference adverse cumulative growth'], color=colors[:num_countries]) | |
# Add a legend | |
handles = [patches.Patch(color=color, label=country) for color, country in zip(colors[:num_countries], merged_df['Country'])] | |
ax.legend(handles=handles, title='Countries', bbox_to_anchor=(1.05, 1), loc='upper left') | |
ax.set_title(f'Histogram of Difference between Adverse cumulative growth of {year2} and {year1} for {sheet1}') | |
ax.set_xlabel('Country') | |
ax.set_ylabel('Difference') | |
plt.xticks(rotation=90) | |
# Save plot to a file | |
file_path = 'output/plot.png' | |
plt.savefig(file_path, format='png', bbox_inches='tight') | |
plt.close() | |
return file_path, gr.update(choices=stored_df1.Country.values.tolist()), gr.update(choices=stored_df2.Country.values.tolist()) | |
def find_sentences_with_keywords(text, keywords): | |
# Split text into sentences using regular expression to match sentence-ending punctuation | |
sentences = re.split(r'(?<=[.!?])\s+', text) | |
matched_sentences = set() # Use a set to store unique sentences | |
# For each keyword, find sentences that contain the keyword as a whole word | |
for keyword in keywords: | |
keyword_pattern = re.compile(rf'\b{re.escape(keyword)}\b', re.IGNORECASE) # Using word boundaries | |
for sentence in sentences: | |
if keyword_pattern.search(sentence): | |
matched_sentences.add(sentence) # Add to set to ensure uniqueness | |
return list(matched_sentences) # Convert set back to list for consistent output | |
# Main function to process both PDFs based on the Excel file names and the sheet name | |
def process_pdfs_and_analyze_sentiment(file1, file2, sheet): | |
# Extract text from both PDFs based on the file name | |
pdf_file1 = file1.replace(".xlsx", ".pdf") | |
pdf_file2 = file2.replace(".xlsx", ".pdf") | |
text1, text2 =extract_and_paragraph(pdf_file1, pdf_file2, False) | |
# Use sheet name as the keyword to find relevant sentences | |
keywords = { | |
'GDP': ['GDP'], | |
'HICP': ['HICP'], | |
'RRE prices': ['RRE', 'residential'], | |
'CRE prices': ['CRE', 'commercial'], | |
'Unemployment': ['unemployment'] | |
} | |
selected_keywords = keywords.get(sheet, []) | |
# Find sentences containing the keywords | |
sentences1 = find_sentences_with_keywords(text1, selected_keywords) | |
sentences2 = find_sentences_with_keywords(text2, selected_keywords) | |
# Concatenate all sentences for each PDF | |
text_pdf1 = "\n".join(sentences1) | |
text_pdf2 = "\n".join(sentences2) | |
# Perform sentiment analysis on the extracted sentences for each PDF | |
result_pdf1 = fin_ext_bis(text_pdf1) | |
result_pdf2 = fin_ext_bis(text_pdf2) | |
return result_pdf1, result_pdf2 | |
#def change_choices(df): | |
# return gr.update(choices=df.Country.values.tolist()) | |
def generate_text(df, country, theme): | |
# Filter the dataframe based on the country | |
#for column in df.columns: | |
# if column != 'Country': | |
# df[column] = df[column].apply(lambda x: f"{x:.6f}%") | |
#row = df[df['Country'] == country].iloc[0] | |
def format_row_for_prompt(row): | |
# Create a formatted string with colons and percentages | |
formatted_row = [] | |
for col, value in row.items(): | |
if col != 'Country': # Exclude 'Country' or format differently if needed | |
if isinstance(value, (int, float)): # Add percentage sign for numeric values | |
value_str = f"{value:.6f}%" | |
else: | |
value_str = str(value) | |
formatted_row.append(f"{col}: {value_str}") | |
else: | |
formatted_row.append(f"{col}: {value}") | |
return "\n".join(formatted_row) | |
# Convert the row to a string format for prompt | |
row = df[df['Country'] == country].iloc[0] | |
row_str = format_row_for_prompt(row) | |
#row_str = row.to_string(index=True) | |
print(row_str) | |
simple_prompt = f""" | |
Here is the data for {theme} in {country}: | |
{row_str} | |
Summarize the adverse growth for {theme} in {country}. Highlight any increase or decrease compared to previous years and include the cumulative result. | |
""" | |
prompt = f""" | |
Here is an example of how to describe adverse growth data for a given country: | |
Country: Australia | |
Adverse 1990: -0.43% | |
Adverse 1991: -1.99% | |
Adverse 1192: -1.20% | |
Adverse Cumulative: -3.57% | |
Topic: GDP | |
Description: | |
In the adverse scenario, the GDP growth in Australia was -0.43% in 1990. It decreased further to -1.99% in 1991, showing worsening conditions. There was a slight improvement to -1.20% in 1992. The total cumulative adverse growth is -3.57%. | |
Now, using the following data for {theme} in {country}, describe the adverse growth: | |
{row_str} | |
Topic: {theme} | |
Describe, using the similar pattern from the example, the changes for the provided country and years. Highlight how the values change year by year and whether they increased or decreased. Do not mention any other countries or years, and describe exactly what is in the table. Keep the description simple and direct. | |
""" | |
prompt = f""" | |
Here is an example of how to describe adverse growth data for a given country: | |
Country: Australia | |
Adverse 1990: -0.43% | |
Adverse 1991: -1.99% | |
Adverse 1992: -1.20% | |
Adverse Cumulative: -3.57% | |
Topic: GDP | |
Description: | |
In the adverse scenario, the GDP growth in Australia was -0.43% in 1990. It worsened to -1.99% in 1991 and slightly improved to -1.20% in 1992. The total cumulative adverse growth was -3.57%. | |
Now, using the following data for {theme} in {country}, describe the adverse growth: | |
{row_str} | |
Topic: {theme} | |
Describe the adverse growth in the provided data year by year. Ensure the description follows the pattern in the example, highlighting whether the values increased or decreased each year. | |
""" | |
prompt1 = f""" | |
Here is an example of how to describe adverse growth data for a given country: | |
Country: Australia | |
Adverse 2020: -0.43% | |
Adverse 2021: -1.99% | |
Adverse 2022: -1.20% | |
Adverse Cumulative: -3.57% | |
Topic: GDP | |
Description: | |
In the adverse scenario, the GDP growth in Australia was -0.43% in 2020. It decreased further to -1.99% in 2021, showing worsening conditions. However, there was a slight improvement to -1.20% in 2022. The total cumulative adverse growth is -3.57%. | |
Now, using the following data for {theme} in {country}, perform the following: | |
1. Highlight how the values change from year to year. | |
2. Describe whether the values increased or decreased compared to the previous year. | |
3. Indicate if the changes represent a worsening or improvement, and if this is strong or slight. | |
4. Include the cumulative result. | |
Data: | |
{row_str} | |
Topic: {theme} | |
Make sure your description follows the example format and accurately reflects the data. | |
""" | |
# Generate the descriptive text using the model | |
result = table_to_text(prompt, max_length=240, temperature = 0.7, top_p = 0.3, do_sample = False)[0]['generated_text'] | |
return result | |
# Global variable | |
stored_paragraphs_1 = [] | |
stored_paragraphs_2 = [] | |
stored_df1 = [] | |
stored_df2 = [] | |
with gr.Blocks() as demo: | |
with gr.Tab("Methodology"): | |
gr.Markdown(""" | |
## Macro-economy Adverse Scenario Comparison from EBA Reports | |
This application allows the user to compare two reports from text contents or from tables. It's divided into two tabs. | |
**First Tab: Text Comparisons** | |
- Select two PDFs. Each PDF's text content will be extracted into paragraphs. | |
- Select a paragraph from one PDF, and find the most similar paragraph from the other PDF using a specific method. | |
- For a selected paragraph, compute summarization using the **FinPEGASUS model**. | |
- For a selected paragraph, compute sentiment analysis of the paragraph, and for each sentence, classify into three classes (Positive, Negative, Neutral) using two different fine-tuned **FinBERT models**: | |
- [ProsusAI/finbert](https://huggingface.co/ProsusAI/finbert) | |
- [yiyanghkust/finbert-tone](https://huggingface.co/yiyanghkust/finbert-tone) | |
**Second Tab: Table Comparisons** | |
- Select two Excel files and a sheet name. | |
- For the two selected tables, compute the difference of the cumulative adverse growth rate over their respective three years for the selected sheet name (topic). | |
- For the selected topic (sheet name), find related sentences in the associated PDF text that mention the topic, and classify them by sentiment. | |
- For a selected country and topic, describe the adverse growth rate trend over three years using the [**google/flan-t5-base**](https://huggingface.co/google/flan-t5-base). | |
""") | |
with gr.Tab("Financial Report Text Analysis"): | |
gr.Markdown("## Paragraph Extraction and Analysis on Adverse Macro-Economy Scenarios") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Step 1: Upload and Extract Paragraphs") | |
pdf1 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 1") | |
pdf2 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 2") | |
extract_button = gr.Button("Extract Paragraphs") | |
with gr.Column(): | |
gr.Markdown("### Step 2: Select Paragraphs for Analysis") | |
paragraph_1_dropdown = gr.Dropdown(label="Select Paragraph from PDF 1") | |
paragraph_2_dropdown = gr.Dropdown(label="Select Paragraph from PDF 2") | |
def update_paragraphs(pdf1, pdf2): | |
stored_paragraphs_1, stored_paragraphs_2 = extract_and_paragraph(pdf1, pdf2, True) | |
return [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_1)], \ | |
[f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_2)] | |
extract_button.click(update_paragraphs, inputs=[pdf1, pdf2], outputs=[paragraph_1_dropdown, paragraph_2_dropdown]) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### PDF 1 Analysis") | |
summarize_btn1 = gr.Button("Summarize Text from PDF 1") | |
summary_textbox_1 = gr.Textbox(label="Summary for PDF 1", lines=2) | |
sentiment_btn1 = gr.Button("Classify Financial Tone from PDF 1") | |
sentiment_textbox_1 = gr.Textbox(label="Tone Classification for PDF 1", lines=1) | |
summarize_btn1.click(process_paragraph_1_sum, inputs=paragraph_1_dropdown, outputs=summary_textbox_1) | |
sentiment_btn1.click(process_paragraph_1_sent, inputs=paragraph_1_dropdown, outputs=sentiment_textbox_1) | |
with gr.Column(): | |
gr.Markdown("### PDF 2 Analysis") | |
summarize_btn2 = gr.Button("Summarize Text from PDF 2") | |
summary_textbox_2 = gr.Textbox(label="Summary for PDF 2", lines=2) | |
sentiment_btn2 = gr.Button("Classify Financial Tone from PDF 2") | |
sentiment_textbox_2 = gr.Textbox(label="Tone Classification for PDF 2", lines=1) | |
summarize_btn2.click(process_paragraph_2_sum, inputs=paragraph_2_dropdown, outputs=summary_textbox_2) | |
sentiment_btn2.click(process_paragraph_2_sent, inputs=paragraph_2_dropdown, outputs=sentiment_textbox_2) | |
with gr.Tab("Financial Report Table Analysis"): | |
gr.Markdown("## Excel Data Comparison and Topic Analysis") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Step 1: Upload Excel Files") | |
file1 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 1") | |
file2 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 2") | |
sheet = gr.Dropdown(choices=["GDP", "HICP", "RRE prices", "Unemployment", "CRE prices"], label="Select Sheet") | |
with gr.Column(): | |
gr.Markdown("### Step 2: Select a Country for Adverse Growth Analysis") | |
country_1_dropdown = gr.Dropdown(label="Select Country from Excel File 1") | |
country_2_dropdown = gr.Dropdown(label="Select Country from Excel File 2") | |
with gr.Row(): | |
gr.Markdown("### Step 3: Compare Data and Generate Reports") | |
comparison_button = gr.Button("Compare Data") | |
text_result_df1 = gr.Textbox(label="Adverse Growth Report for Excel File 1", lines=4) | |
text_result_df2 = gr.Textbox(label="Adverse Growth Report for Excel File 2", lines=4) | |
comparison_button.click(fn=process_and_compare, inputs=[file1, sheet, file2], outputs=[text_result_df1, text_result_df2]) | |
demo.launch() | |