umairahmad89
Add lookup to previous quarter sheet and handle no previous quarter order
f0a94b0
import gradio as gr
import openpyxl
import csv
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import tempfile
import os
import pandas as pd
import re
# Load the sentence transformer model
model = SentenceTransformer('BAAI/bge-small-en-v1.5')
def filter_excel1(excel_path, min_row, max_row):
try:
excel = openpyxl.load_workbook(excel_path)
sheet_0 = excel.worksheets[0]
data = [["category", "diagnostic_statement"]]
prev_category = ""
for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row):
category = row[1].value
diagnostic_statement = row[5].value
if prev_category == "":
prev_category = category
if not category:
category = prev_category
else:
prev_category = category
data.append([category, diagnostic_statement])
return data
except Exception as e:
raise gr.Error(f"Error processing Excel 1: {str(e)}")
def filter_excel2(excel_path, min_row, max_row, sheetname):
try:
excel = openpyxl.load_workbook(excel_path)
sheet_0 = excel[sheetname]
data = [["description", "category"]]
for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row):
description = row[0].value
category = row[6].value
# filtering out the categories
if isinstance(category, str) and category!="#N/A":
pass
elif isinstance(category, int):
category="#N/A"
else:
category="#N/A"
if description:
data.append([description, category])
return data
except Exception as e:
raise gr.Error(f"Error processing Excel 2: {str(e)}")
def sheet_lookup(current_sheet_name, excel_file_path):
# Read the Excel file
xl = pd.ExcelFile(excel_file_path)
# Determine the previous quarter sheet name
match = re.match(r'(\d)Q(\d{4})', current_sheet_name)
if match:
quarter, year = map(int, match.groups())
prev_quarter = 4 if quarter == 1 else quarter - 1
prev_year = year - 1 if quarter == 1 else year
prev_sheet_name = f"{prev_quarter}Q{prev_year}"
else:
raise ValueError("Invalid sheet name format")
# Read the current sheet
current_df = xl.parse(current_sheet_name)
# Check if previous sheet exists
if prev_sheet_name in xl.sheet_names:
# Read the previous quarter sheet
prev_df = xl.parse(prev_sheet_name)
# Perform the lookup
lookup_col = 'Monitoring Tool Instance ID-AU'
current_df.drop_duplicates(subset=[lookup_col], keep='first', inplace=True)
prev_df.drop_duplicates(subset=[lookup_col], keep='first', inplace=True)
value_col = f"{prev_quarter}q CRI Profile Mapping"
result_col = f"{quarter}q CRI Profile Mapping"
# Create a dictionary for faster lookup
lookup_dict = dict(zip(prev_df[lookup_col], prev_df[value_col]))
# Perform the lookup and fill the result column
current_df[result_col] = current_df[lookup_col].map(lookup_dict).fillna('#N/A')
else:
# If previous sheet doesn't exist, fill the result column with '#N/A'
result_col = f"{quarter}q CRI Profile Mapping"
current_df[result_col] = '#N/A'
print(f"Warning: Previous sheet {prev_sheet_name} not found. Filling {result_col} with '#N/A'")
# Save the results back to the Excel file
with pd.ExcelWriter(excel_file_path, mode='a', if_sheet_exists='replace') as writer:
current_df.to_excel(writer, sheet_name=current_sheet_name, index=False)
print(f"Processing complete for sheet {current_sheet_name}")
def get_embeddings(texts):
return model.encode(texts)
def get_top_n_categories(query_embedding, statement_embeddings, categories, n=3):
similarities = cosine_similarity([query_embedding], statement_embeddings)[0]
top_indices = np.argsort(similarities)[-n:][::-1]
return [categories[i] for i in top_indices]
def process_data(csv1_data, csv2_data):
try:
diagnostic_statements = [row[1] for row in csv1_data[1:]]
statement_embeddings = get_embeddings(diagnostic_statements)
categories = [row[0] for row in csv1_data[1:]]
processed_descriptions = []
processed_categories = []
for row in csv2_data[1:]:
description = row[0]
if description in processed_descriptions:
row[1] = processed_categories[processed_descriptions.index(description)]
continue
if row[1] != "#N/A":
processed_categories.append(row[1])
processed_descriptions.append(description)
continue
description_embedding = get_embeddings([description])[0]
top_categories = get_top_n_categories(description_embedding, statement_embeddings, categories)
row[1] = ', '.join(top_categories)
processed_descriptions.append(description)
processed_categories.append(', '.join(top_categories))
return csv2_data
except Exception as e:
raise gr.Error(f"Error processing data: {str(e)}")
def update_excel(excel_path, processed_data, sheetname):
try:
excel = openpyxl.load_workbook(excel_path)
sheet_0 = excel[sheetname]
idx = 0
for row in sheet_0.iter_rows(min_row=2):
description = row[0]
category = row[6]
if not description.value:
continue
try:
sheet_0.cell(row=category.row, column=category.col_idx, value=processed_data[idx][1])
idx += 1
except IndexError:
print(f"Warning: Not enough processed data for row {category.row}")
return excel
except Exception as e:
raise gr.Error(f"Error updating Excel: {str(e)}")
def process_files(excel1, excel2, min_row1, max_row1, min_row2, max_row2, sheetname):
try:
gr.Info("Starting processing...")
gr.Info("Doing lookup...")
sheet_lookup(sheetname, excel2)
# Process Excel 1
gr.Info("Processing Excel 1...")
csv1_data = filter_excel1(excel1, min_row1, max_row1)
# Process Excel 2
gr.Info("Processing Excel 2...")
csv2_data = filter_excel2(excel2, min_row2, max_row2, sheetname)
# Process data
gr.Info("Running similarity search...")
processed_data = process_data(csv1_data, csv2_data)
# Update Excel 2
gr.Info("Updating Excel file...")
updated_excel = update_excel(excel2, processed_data[1:], sheetname)
# Save the updated Excel file
gr.Info("Saving updated Excel file...")
with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp:
updated_excel.save(tmp.name)
gr.Info("Processing complete!")
return tmp.name
except gr.Error as e:
# Re-raise Gradio errors to display them in the interface
raise e
except Exception as e:
# Catch any other unexpected errors
raise gr.Error(f"An unexpected error occurred: {str(e)}")
# Gradio interface
iface = gr.Interface(
fn=process_files,
inputs=[
gr.File(label="Upload Source Excel (Excel 1)"),
gr.File(label="Upload Excel to be Filled (Excel 2)"),
gr.Number(label="Min Row for Excel 1", value=2),
gr.Number(label="Max Row for Excel 1", value=1000),
gr.Number(label="Min Row for Excel 2", value=2),
gr.Number(label="Max Row for Excel 2", value=3009),
gr.Textbox(label="Sheet Name for Excel 2")
],
outputs=gr.File(label="Download Updated Excel"),
title="Excel Processor",
description="Upload two Excel files, specify row ranges, and download the processed Excel file."
)
iface.launch()