Spaces:
Running
Running
import os | |
import gradio as gr | |
from transformers import pipeline | |
import spacy | |
import lib.read_pdf | |
import pandas as pd | |
import re | |
import matplotlib.pyplot as plt | |
import matplotlib.patches as patches | |
import io | |
# Initialize spaCy model | |
nlp = spacy.load('en_core_web_sm') | |
nlp.add_pipe('sentencizer') | |
def split_in_sentences(text): | |
doc = nlp(text) | |
return [str(sent).strip() for sent in doc.sents] | |
def make_spans(text, results): | |
results_list = [res['label'] for res in results] | |
facts_spans = list(zip(split_in_sentences(text), results_list)) | |
return facts_spans | |
# Initialize pipelines | |
summarizer = pipeline("summarization", model="human-centered-summarization/financial-summarization-pegasus") | |
fin_model = pipeline("sentiment-analysis", model='yiyanghkust/finbert-tone', tokenizer='yiyanghkust/finbert-tone') | |
fin_model_bis = pipeline("sentiment-analysis", model='ProsusAI/finbert', tokenizer='ProsusAI/finbert') | |
table_to_text = pipeline('text2text-generation', model='google/flan-t5-large') | |
def summarize_text(text): | |
resp = summarizer(text) | |
return resp[0]['summary_text'] | |
def text_to_sentiment(text): | |
sentiment = fin_model(text)[0]["label"] | |
return sentiment | |
def fin_ext(text): | |
results = fin_model(split_in_sentences(text)) | |
return make_spans(text, results) | |
def fin_ext_bis(text): | |
results = fin_model_bis(split_in_sentences(text)) | |
return make_spans(text, results) | |
def extract_and_paragraph(pdf1, pdf2, paragraph): | |
if not pdf1 or not pdf2: | |
return [], [] | |
pdf1_path = os.path.join(PDF_FOLDER, pdf1) | |
pdf2_path = os.path.join(PDF_FOLDER, pdf2) | |
# Extract and format paragraphs | |
paragraphs_1 = lib.read_pdf.extract_and_format_paragraphs(pdf1_path) | |
paragraphs_2 = lib.read_pdf.extract_and_format_paragraphs(pdf2_path) | |
start_keyword = "Main risks to" | |
end_keywords = ["4. Appendix", "Annex:", "4. Annex", "Detailed tables", "ACKNOWLEDGEMENTS", "STATISTICAL ANNEX", "PROSPECTS BY MEMBER STATES"] | |
start_index1, end_index1 = lib.read_pdf.find_text_range(paragraphs_1, start_keyword, end_keywords) | |
start_index2, end_index2 = lib.read_pdf.find_text_range(paragraphs_2, start_keyword, end_keywords) | |
paragraphs_1 = lib.read_pdf.extract_relevant_text(paragraphs_1, start_index1, end_index1) | |
paragraphs_2 = lib.read_pdf.extract_relevant_text(paragraphs_2, start_index2, end_index2) | |
if paragraph: | |
paragraphs_1 = lib.read_pdf.split_text_into_paragraphs(paragraphs_1, 0) | |
paragraphs_2 = lib.read_pdf.split_text_into_paragraphs(paragraphs_2, 0) | |
return paragraphs_1, paragraphs_2 | |
# Gradio interface setup | |
PDF_FOLDER = "data" | |
def get_pdf_files(folder): | |
return [f for f in os.listdir(folder) if f.endswith('.pdf')] | |
def show(name): | |
return f"{name}" | |
def get_excel_files(folder): | |
return [f for f in os.listdir(folder) if f.endswith('.xlsx')] | |
def get_sheet_names(file): | |
xls = pd.ExcelFile(os.path.join(PDF_FOLDER, file)) | |
return gr.update(choices=xls.sheet_names) | |
def process_and_compare(file1, sheet1, file2, sheet2): | |
def process_file(file_path, sheet_name): | |
# Extract year from file name | |
year = int(re.search(r'(\d{4})', file_path).group(1)) | |
# Load the Excel file | |
df = pd.read_excel(os.path.join(PDF_FOLDER, file_path), sheet_name=sheet_name, index_col=0) | |
# Define expected columns based on extracted year | |
historical_col = f'Historical {year - 1}' | |
baseline_cols = [f'Baseline {year}', f'Baseline {year + 1}', f'Baseline {year + 2}'] | |
adverse_cols = [f'Adverse {year}', f'Adverse {year + 1}', f'Adverse {year + 2}'] | |
level_deviation_col = f'Level Deviation {year + 2}' | |
# Drop rows and reset index | |
df = df.iloc[4:].reset_index(drop=True) | |
# Define the new column names | |
new_columns = ['Country', 'Code', historical_col] + baseline_cols + adverse_cols + ['Adverse Cumulative', 'Adverse Minimum', level_deviation_col] | |
# Ensure the number of columns matches | |
if len(df.columns) == len(new_columns): | |
df.columns = new_columns | |
else: | |
raise ValueError(f"Expected {len(new_columns)} columns, but found {len(df.columns)} columns in the data.") | |
columns = ['Country', f'Adverse {year}', f'Adverse {year+1}', f'Adverse {year+2}', 'Adverse Cumulative'] | |
return df, df[columns] | |
# Process both files | |
global stored_df1, stored_df2 | |
df1, stored_df1 = process_file(file1, sheet1) | |
df2, stored_df2 = process_file(file2, sheet2) | |
year1 = int(re.search(r'(\d{4})', file1).group(1)) | |
year2 = int(re.search(r'(\d{4})', file2).group(1)) | |
# Merge dataframes on 'Country' | |
merged_df = pd.merge(df2, df1, on='Country', suffixes=(f'_{year1}', f'_{year2}')) | |
merged_df['Difference adverse cumulative growth'] = merged_df[f'Adverse Cumulative_{year2}'] - merged_df[f'Adverse Cumulative_{year1}'] | |
# Ensure data types are correct | |
merged_df['Country'] = merged_df['Country'].astype(str) | |
merged_df['Difference adverse cumulative growth'] = pd.to_numeric(merged_df['Difference adverse cumulative growth'], errors='coerce') | |
# Create histogram plot with color coding | |
fig, ax = plt.subplots(figsize=(12, 8)) | |
colors = plt.get_cmap('tab20').colors # Use a colormap with multiple colors | |
num_countries = len(merged_df['Country']) | |
bars = ax.bar(merged_df['Country'], merged_df['Difference adverse cumulative growth'], color=colors[:num_countries]) | |
# Add a legend | |
handles = [patches.Patch(color=color, label=country) for color, country in zip(colors[:num_countries], merged_df['Country'])] | |
ax.legend(handles=handles, title='Countries', bbox_to_anchor=(1.05, 1), loc='upper left') | |
ax.set_title(f'Histogram of Difference between Adverse cumulative growth of {year2} and {year1} for {sheet1}') | |
ax.set_xlabel('Country') | |
ax.set_ylabel('Difference') | |
plt.xticks(rotation=90) | |
# Save plot to a file | |
file_path = 'output/plot.png' | |
plt.savefig(file_path, format='png', bbox_inches='tight') | |
plt.close() | |
return file_path, gr.update(choices=stored_df1.Country.values.tolist()), gr.update(choices=stored_df2.Country.values.tolist()) | |
def find_sentences_with_keywords(text, keywords): | |
# Split text into sentences using regular expression to match sentence-ending punctuation | |
sentences = re.split(r'(?<=[.!?])\s+', text) | |
matched_sentences = set() # Use a set to store unique sentences | |
# For each keyword, find sentences that contain the keyword as a whole word | |
for keyword in keywords: | |
keyword_pattern = re.compile(rf'\b{re.escape(keyword)}\b', re.IGNORECASE) # Using word boundaries | |
for sentence in sentences: | |
if keyword_pattern.search(sentence): | |
matched_sentences.add(sentence) # Add to set to ensure uniqueness | |
return list(matched_sentences) # Convert set back to list for consistent output | |
# Main function to process both PDFs based on the Excel file names and the sheet name | |
def process_pdfs_and_analyze_sentiment(file1, file2, sheet): | |
# Extract text from both PDFs based on the file name | |
pdf_file1 = file1.replace(".xlsx", ".pdf") | |
pdf_file2 = file2.replace(".xlsx", ".pdf") | |
text1, text2 =extract_and_paragraph(pdf_file1, pdf_file2, False) | |
# Use sheet name as the keyword to find relevant sentences | |
keywords = { | |
'GDP': ['GDP'], | |
'HICP': ['HICP'], | |
'RRE prices': ['RRE', 'residential'], | |
'CRE prices': ['CRE', 'commercial'], | |
'Unemployment': ['unemployment'] | |
} | |
selected_keywords = keywords.get(sheet, []) | |
# Find sentences containing the keywords | |
sentences1 = find_sentences_with_keywords(text1, selected_keywords) | |
sentences2 = find_sentences_with_keywords(text2, selected_keywords) | |
# Concatenate all sentences for each PDF | |
text_pdf1 = "\n".join(sentences1) | |
text_pdf2 = "\n".join(sentences2) | |
# Perform sentiment analysis on the extracted sentences for each PDF | |
result_pdf1 = fin_ext_bis(text_pdf1) | |
result_pdf2 = fin_ext_bis(text_pdf2) | |
return result_pdf1, result_pdf2 | |
#def change_choices(df): | |
# return gr.update(choices=df.Country.values.tolist()) | |
def generate_text(df, country, theme): | |
# Filter the dataframe based on the country | |
#for column in df.columns: | |
# if column != 'Country': | |
# df[column] = df[column].apply(lambda x: f"{x:.6f}%") | |
#row = df[df['Country'] == country].iloc[0] | |
def format_row_for_prompt(row): | |
# Create a formatted string with colons and percentages | |
formatted_row = [] | |
for col, value in row.items(): | |
if col != 'Country': # Exclude 'Country' or format differently if needed | |
if isinstance(value, (int, float)): # Add percentage sign for numeric values | |
value_str = f"{value:.6f}%" | |
else: | |
value_str = str(value) | |
formatted_row.append(f"{col}: {value_str}") | |
else: | |
formatted_row.append(f"{col}: {value}") | |
return "\n".join(formatted_row) | |
# Convert the row to a string format for prompt | |
row = df[df['Country'] == country].iloc[0] | |
row_str = format_row_for_prompt(row) | |
#row_str = row.to_string(index=True) | |
print(row_str) | |
simple_prompt = f""" | |
Here is the data for {theme} in {country}: | |
{row_str} | |
Summarize the adverse growth for {theme} in {country}. Highlight any increase or decrease compared to previous years and include the cumulative result. | |
""" | |
prompt = f""" | |
Here is an example of how to describe adverse growth data for a given country: | |
Country: Australia | |
Adverse 1990: -0.43% | |
Adverse 1991: -1.99% | |
Adverse 1192: -1.20% | |
Adverse Cumulative: -3.57% | |
Topic: GDP | |
Description: | |
In the adverse scenario, the GDP growth in Australia was -0.43% in 1990. It decreased further to -1.99% in 1991, showing worsening conditions. There was a slight improvement to -1.20% in 1992. The total cumulative adverse growth is -3.57%. | |
Now, using the following data for {theme} in {country}, describe the adverse growth: | |
{row_str} | |
Topic: {theme} | |
Describe, using the similar pattern from the example, the changes for the provided country and years. Highlight how the values change year by year and whether they increased or decreased. Do not mention any other countries or years, and describe exactly what is in the table. Keep the description simple and direct. | |
""" | |
prompt = f""" | |
Here is an example of how to describe adverse growth data for a given country: | |
Country: Australia | |
Adverse 1990: -0.43% | |
Adverse 1991: -1.99% | |
Adverse 1992: -1.20% | |
Adverse Cumulative: -3.57% | |
Topic: GDP | |
Description: | |
In the adverse scenario, the GDP growth in Australia was -0.43% in 1990. It worsened to -1.99% in 1991 and slightly improved to -1.20% in 1992. The total cumulative adverse growth was -3.57%. | |
Now, using the following data for {theme} in {country}, describe the adverse growth: | |
{row_str} | |
Topic: {theme} | |
Your task is to describe the changes in the provided data year by year, just like in the example. Focus only on the provided country and values, and do not introduce any new countries or years. Ensure the description follows the pattern in the example, highlighting whether the values increased or decreased each year. | |
""" | |
prompt1 = f""" | |
Here is an example of how to describe adverse growth data for a given country: | |
Country: Australia | |
Adverse 2020: -0.43% | |
Adverse 2021: -1.99% | |
Adverse 2022: -1.20% | |
Adverse Cumulative: -3.57% | |
Topic: GDP | |
Description: | |
In the adverse scenario, the GDP growth in Australia was -0.43% in 2020. It decreased further to -1.99% in 2021, showing worsening conditions. However, there was a slight improvement to -1.20% in 2022. The total cumulative adverse growth is -3.57%. | |
Now, using the following data for {theme} in {country}, perform the following: | |
1. Highlight how the values change from year to year. | |
2. Describe whether the values increased or decreased compared to the previous year. | |
3. Indicate if the changes represent a worsening or improvement, and if this is strong or slight. | |
4. Include the cumulative result. | |
Data: | |
{row_str} | |
Topic: {theme} | |
Make sure your description follows the example format and accurately reflects the data. | |
""" | |
# Generate the descriptive text using the model | |
result = table_to_text(prompt, max_length=240, temperature = 0.7, do_sample = True)[0]['generated_text'] | |
return result | |
# Global variable | |
stored_paragraphs_1 = [] | |
stored_paragraphs_2 = [] | |
stored_df1 = [] | |
stored_df2 = [] | |
with gr.Blocks() as demo: | |
with gr.Tab("Financial Report Text Analysis"): | |
gr.Markdown("## Financial Report Paragraph Selection and Analysis on adverse macro-economy scenario") | |
with gr.Row(): | |
# Upload PDFs | |
with gr.Column(): | |
pdf1 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 1") | |
pdf2 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 2") | |
with gr.Column(): | |
b1 = gr.Button("Extract and Display Paragraphs") | |
paragraph_1_dropdown = gr.Dropdown(label="Select Paragraph from PDF 1") | |
paragraph_2_dropdown = gr.Dropdown(label="Select Paragraph from PDF 2") | |
def update_paragraphs(pdf1, pdf2): | |
global stored_paragraphs_1, stored_paragraphs_2 | |
stored_paragraphs_1, stored_paragraphs_2 = extract_and_paragraph(pdf1, pdf2, True) | |
updated_dropdown_1 = [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_1)] | |
updated_dropdown_2 = [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_2)] | |
return gr.update(choices=updated_dropdown_1), gr.update(choices=updated_dropdown_2) | |
b1.click(fn=update_paragraphs, inputs=[pdf1, pdf2], outputs=[paragraph_1_dropdown, paragraph_2_dropdown]) | |
with gr.Row(): | |
# Process the selected paragraph from PDF 1 | |
with gr.Column(): | |
gr.Markdown("### PDF 1 Analysis") | |
selected_paragraph_1 = gr.Textbox(label="Selected Paragraph 1 Content", lines=4) | |
summarize_btn1 = gr.Button("Summarize Text from PDF 1") | |
summary_textbox_1 = gr.Textbox(label="Summary for PDF 1", lines=2) | |
summarize_btn1.click(fn=lambda p: process_paragraph_1_sum(p), inputs=paragraph_1_dropdown, outputs=summary_textbox_1) | |
sentiment_btn1 = gr.Button("Classify Financial Tone from PDF 1") | |
sentiment_textbox_1 = gr.Textbox(label="Classification for PDF 1", lines=1) | |
sentiment_btn1.click(fn=lambda p: process_paragraph_1_sent(p), inputs=paragraph_1_dropdown, outputs=sentiment_textbox_1) | |
analyze_btn1 = gr.Button("Analyze Financial Tone on each sentence with yiyanghkust/finbert-tone") | |
fin_spans_1 = gr.HighlightedText(label="Financial Tone Analysis for PDF 1") | |
analyze_btn1.click(fn=lambda p: process_paragraph_1_sent_tone(p), inputs=paragraph_1_dropdown, outputs=fin_spans_1) | |
analyze_btn1_ = gr.Button("Analyze Financial Tone on each sentence with ProsusAI/finbert") | |
fin_spans_1_ = gr.HighlightedText(label="Financial Tone Analysis for PDF 1 bis") | |
analyze_btn1_.click(fn=lambda p: process_paragraph_1_sent_tone_bis(p), inputs=paragraph_1_dropdown, outputs=fin_spans_1_) | |
# Process the selected paragraph from PDF 2 | |
with gr.Column(): | |
gr.Markdown("### PDF 2 Analysis") | |
selected_paragraph_2 = gr.Textbox(label="Selected Paragraph 2 Content", lines=4) | |
selected_paragraph_2.change(show, paragraph_2_dropdown, selected_paragraph_2) | |
summarize_btn2 = gr.Button("Summarize Text from PDF 2") | |
summary_textbox_2 = gr.Textbox(label="Summary for PDF 2", lines=2) | |
summarize_btn2.click(fn=lambda p: process_paragraph_2_sum(p), inputs=paragraph_2_dropdown, outputs=summary_textbox_2) | |
sentiment_btn2 = gr.Button("Classify Financial Tone from PDF 2") | |
sentiment_textbox_2 = gr.Textbox(label="Classification for PDF 2", lines=1) | |
sentiment_btn2.click(fn=lambda p: process_paragraph_2_sent(p), inputs=paragraph_2_dropdown, outputs=sentiment_textbox_2) | |
analyze_btn2 = gr.Button("Analyze Financial Tone on each sentence with yiyanghkust/finbert-tone") | |
fin_spans_2 = gr.HighlightedText(label="Financial Tone Analysis for PDF 2") | |
analyze_btn2.click(fn=lambda p: process_paragraph_2_sent_tone(p), inputs=paragraph_2_dropdown, outputs=fin_spans_2) | |
analyze_btn2_ = gr.Button("Analyze Financial Tone on each sentence with ProsusAI/finbert") | |
fin_spans_2_ = gr.HighlightedText(label="Financial Tone Analysis for PDF 2 bis") | |
analyze_btn2_.click(fn=lambda p: process_paragraph_2_sent_tone_bis(p), inputs=paragraph_2_dropdown, outputs=fin_spans_2_) | |
with gr.Tab("Financial Report Table Analysis"): | |
# New tab content goes here | |
gr.Markdown("## Excel Data Comparison") | |
with gr.Row(): | |
with gr.Column(): | |
file1 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 1") | |
file2 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 2") | |
sheet = gr.Dropdown(choices=["GDP", "HICP", "RRE prices", "Unemployment", "CRE prices"], label="Select Sheet for File 1 and 2") | |
with gr.Column(): | |
result = gr.Image(label="Comparison pLot") | |
def update_sheets(file): | |
return get_sheet_names(file) | |
b1 = gr.Button("Compare Data") | |
b2 = gr.Button("Extract text information") | |
with gr.Row(): | |
with gr.Column(): | |
sentiment_results_pdf1 = gr.HighlightedText(label="Sentiment Analysis - PDF 1") | |
country_1_dropdown = gr.Dropdown(label="Select Country from Excel File 1") | |
summarize_btn1_country = gr.Button("Summary for the selected country") | |
text_result_df1 = gr.Textbox(label="Sentence for excel file 1", lines=2) | |
summarize_btn1_country.click(fn=lambda country, theme: generate_text(stored_df1, country, theme), | |
inputs=[country_1_dropdown, sheet], | |
outputs=text_result_df1) | |
with gr.Column(): | |
sentiment_results_pdf2 = gr.HighlightedText(label="Sentiment Analysis - PDF 2") | |
country_2_dropdown = gr.Dropdown(label="Select Country from Excel File 2") | |
summarize_btn2_country = gr.Button("Summary for the selected country") | |
text_result_df2 = gr.Textbox(label="Sentence for excel file 2", lines=2) | |
summarize_btn2_country.click(fn=lambda country, theme: generate_text(stored_df2, country, theme), | |
inputs=[country_2_dropdown, sheet], | |
outputs=text_result_df2) | |
# Button to extract text from PDFs and perform sentiment analysis | |
b1.click(fn=process_and_compare, inputs=[file1, sheet, file2, sheet], outputs=[result,country_1_dropdown, country_2_dropdown]) | |
b2.click(fn=process_pdfs_and_analyze_sentiment, inputs=[file1, file2, sheet], outputs=[sentiment_results_pdf1, sentiment_results_pdf2]) | |
demo.launch() | |