Spaces:
Runtime error
Runtime error
import gradio as gr | |
import openpyxl | |
import csv | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import numpy as np | |
import tempfile | |
import os | |
import pandas as pd | |
import re | |
# Load the sentence transformer model | |
model = SentenceTransformer('BAAI/bge-small-en-v1.5') | |
def filter_excel1(excel_path, min_row, max_row): | |
try: | |
excel = openpyxl.load_workbook(excel_path) | |
sheet_0 = excel.worksheets[0] | |
data = [["category", "diagnostic_statement"]] | |
prev_category = "" | |
for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row): | |
category = row[1].value | |
diagnostic_statement = row[5].value | |
if prev_category == "": | |
prev_category = category | |
if not category: | |
category = prev_category | |
else: | |
prev_category = category | |
data.append([category, diagnostic_statement]) | |
return data | |
except Exception as e: | |
raise gr.Error(f"Error processing Excel 1: {str(e)}") | |
def filter_excel2(excel_path, min_row, max_row, sheetname): | |
try: | |
excel = openpyxl.load_workbook(excel_path) | |
sheet_0 = excel[sheetname] | |
data = [["description", "category"]] | |
for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row): | |
description = row[0].value | |
category = row[6].value | |
# filtering out the categories | |
if isinstance(category, str) and category!="#N/A": | |
pass | |
elif isinstance(category, int): | |
category="#N/A" | |
else: | |
category="#N/A" | |
if description: | |
data.append([description, category]) | |
return data | |
except Exception as e: | |
raise gr.Error(f"Error processing Excel 2: {str(e)}") | |
def sheet_lookup(current_sheet_name, excel_file_path): | |
# Read the Excel file | |
xl = pd.ExcelFile(excel_file_path) | |
# Determine the previous quarter sheet name | |
match = re.match(r'(\d)Q(\d{4})', current_sheet_name) | |
if match: | |
quarter, year = map(int, match.groups()) | |
prev_quarter = 4 if quarter == 1 else quarter - 1 | |
prev_year = year - 1 if quarter == 1 else year | |
prev_sheet_name = f"{prev_quarter}Q{prev_year}" | |
else: | |
raise ValueError("Invalid sheet name format") | |
# Read the current sheet | |
current_df = xl.parse(current_sheet_name) | |
# Check if previous sheet exists | |
if prev_sheet_name in xl.sheet_names: | |
# Read the previous quarter sheet | |
prev_df = xl.parse(prev_sheet_name) | |
# Perform the lookup | |
lookup_col = 'Monitoring Tool Instance ID-AU' | |
current_df.drop_duplicates(subset=[lookup_col], keep='first', inplace=True) | |
prev_df.drop_duplicates(subset=[lookup_col], keep='first', inplace=True) | |
value_col = f"{prev_quarter}q CRI Profile Mapping" | |
result_col = f"{quarter}q CRI Profile Mapping" | |
# Create a dictionary for faster lookup | |
lookup_dict = dict(zip(prev_df[lookup_col], prev_df[value_col])) | |
# Perform the lookup and fill the result column | |
current_df[result_col] = current_df[lookup_col].map(lookup_dict).fillna('#N/A') | |
else: | |
# If previous sheet doesn't exist, fill the result column with '#N/A' | |
result_col = f"{quarter}q CRI Profile Mapping" | |
current_df[result_col] = '#N/A' | |
print(f"Warning: Previous sheet {prev_sheet_name} not found. Filling {result_col} with '#N/A'") | |
# Save the results back to the Excel file | |
with pd.ExcelWriter(excel_file_path, mode='a', if_sheet_exists='replace') as writer: | |
current_df.to_excel(writer, sheet_name=current_sheet_name, index=False) | |
print(f"Processing complete for sheet {current_sheet_name}") | |
def get_embeddings(texts): | |
return model.encode(texts) | |
def get_top_n_categories(query_embedding, statement_embeddings, categories, n=3): | |
similarities = cosine_similarity([query_embedding], statement_embeddings)[0] | |
top_indices = np.argsort(similarities)[-n:][::-1] | |
return [categories[i] for i in top_indices] | |
def process_data(csv1_data, csv2_data): | |
try: | |
diagnostic_statements = [row[1] for row in csv1_data[1:]] | |
statement_embeddings = get_embeddings(diagnostic_statements) | |
categories = [row[0] for row in csv1_data[1:]] | |
processed_descriptions = [] | |
processed_categories = [] | |
for row in csv2_data[1:]: | |
description = row[0] | |
if description in processed_descriptions: | |
row[1] = processed_categories[processed_descriptions.index(description)] | |
continue | |
if row[1] != "#N/A": | |
processed_categories.append(row[1]) | |
processed_descriptions.append(description) | |
continue | |
description_embedding = get_embeddings([description])[0] | |
top_categories = get_top_n_categories(description_embedding, statement_embeddings, categories) | |
row[1] = ', '.join(top_categories) | |
processed_descriptions.append(description) | |
processed_categories.append(', '.join(top_categories)) | |
return csv2_data | |
except Exception as e: | |
raise gr.Error(f"Error processing data: {str(e)}") | |
def update_excel(excel_path, processed_data, sheetname): | |
try: | |
excel = openpyxl.load_workbook(excel_path) | |
sheet_0 = excel[sheetname] | |
idx = 0 | |
for row in sheet_0.iter_rows(min_row=2): | |
description = row[0] | |
category = row[6] | |
if not description.value: | |
continue | |
try: | |
sheet_0.cell(row=category.row, column=category.col_idx, value=processed_data[idx][1]) | |
idx += 1 | |
except IndexError: | |
print(f"Warning: Not enough processed data for row {category.row}") | |
return excel | |
except Exception as e: | |
raise gr.Error(f"Error updating Excel: {str(e)}") | |
def process_files(excel1, excel2, min_row1, max_row1, min_row2, max_row2, sheetname): | |
try: | |
gr.Info("Starting processing...") | |
gr.Info("Doing lookup...") | |
sheet_lookup(sheetname, excel2) | |
# Process Excel 1 | |
gr.Info("Processing Excel 1...") | |
csv1_data = filter_excel1(excel1, min_row1, max_row1) | |
# Process Excel 2 | |
gr.Info("Processing Excel 2...") | |
csv2_data = filter_excel2(excel2, min_row2, max_row2, sheetname) | |
# Process data | |
gr.Info("Running similarity search...") | |
processed_data = process_data(csv1_data, csv2_data) | |
# Update Excel 2 | |
gr.Info("Updating Excel file...") | |
updated_excel = update_excel(excel2, processed_data[1:], sheetname) | |
# Save the updated Excel file | |
gr.Info("Saving updated Excel file...") | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
updated_excel.save(tmp.name) | |
gr.Info("Processing complete!") | |
return tmp.name | |
except gr.Error as e: | |
# Re-raise Gradio errors to display them in the interface | |
raise e | |
except Exception as e: | |
# Catch any other unexpected errors | |
raise gr.Error(f"An unexpected error occurred: {str(e)}") | |
# Gradio interface | |
iface = gr.Interface( | |
fn=process_files, | |
inputs=[ | |
gr.File(label="Upload Source Excel (Excel 1)"), | |
gr.File(label="Upload Excel to be Filled (Excel 2)"), | |
gr.Number(label="Min Row for Excel 1", value=2), | |
gr.Number(label="Max Row for Excel 1", value=1000), | |
gr.Number(label="Min Row for Excel 2", value=2), | |
gr.Number(label="Max Row for Excel 2", value=3009), | |
gr.Textbox(label="Sheet Name for Excel 2") | |
], | |
outputs=gr.File(label="Download Updated Excel"), | |
title="Excel Processor", | |
description="Upload two Excel files, specify row ranges, and download the processed Excel file." | |
) | |
iface.launch() |