Spaces:
Runtime error
Runtime error
import gradio as gr | |
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor | |
from qwen_vl_utils import process_vision_info | |
import torch | |
import pandas as pd | |
from datetime import datetime | |
from azure.storage.blob import BlobServiceClient | |
from io import BytesIO | |
import re | |
# Azure Storage Account details | |
STORAGE_ACCOUNT_NAME = "piointernaldestrg" | |
STORAGE_ACCOUNT_KEY = "Pd91QXwgXkiRyd4njM06B9rRFSvtMBijk99N9s7n1M405Kmn4vWzMUmm0vstoYtLLepFmKb9iBaJ+ASt6q+jwg==" | |
CONTAINER_NAME = "invoices" | |
# Initialize model and processor | |
model = Qwen2VLForConditionalGeneration.from_pretrained("Qwen/Qwen2-VL-2B-Instruct-AWQ", torch_dtype="auto") | |
if torch.cuda.is_available(): | |
model.to("cuda") | |
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct-AWQ") | |
# Function to process a batch of images | |
def process_image_batch(model, processor, image_paths): | |
results = [] | |
for image_path in image_paths: | |
try: | |
prompt = ( | |
"Please extract the following details from the invoice:\n" | |
"- 'invoice_number'\n" | |
"- 'date'\n" | |
"- 'place of invoice (city)'\n" | |
"- 'total amount'\n" | |
"- 'category of invoice (like food, stay, travel, other)'" | |
) | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image", "image": image_path}, | |
{"type": "text", "text": prompt}, | |
], | |
} | |
] | |
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
image_inputs, video_inputs = process_vision_info(messages) | |
inputs = processor( | |
text=[text], | |
images=image_inputs, | |
videos=video_inputs, | |
padding=True, | |
return_tensors="pt", | |
) | |
inputs = inputs.to(model.device) | |
generated_ids = model.generate(**inputs, max_new_tokens=128) | |
generated_ids_trimmed = [out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] | |
output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
structured_data = { | |
"invoice_number": None, | |
"date": None, | |
"place_of_invoice": None, | |
"total_amount": None, | |
"category_of_invoice": None, | |
} | |
total_amount_found = False | |
for line in output_text[0].split("\n"): | |
# Invoice number mapping logic | |
if any(keyword in line.lower() for keyword in ["invoice_number", "number in bold", "number", "bill number", "estimate number"]): | |
structured_data["invoice_number"] = line.split(":")[-1].strip() | |
# Date mapping logic | |
elif "date" in line.lower(): | |
date = line.split(":")[-1].strip() | |
structured_data["date"] = process_date(date) | |
# Place of invoice mapping logic | |
elif "place of invoice" in line.lower(): | |
structured_data["place_of_invoice"] = line.split(":")[-1].strip() | |
# Total amount mapping logic | |
elif any(keyword in line.lower() for keyword in ["total", "total amount", "grand total", "final amount", "balance due"]): | |
amounts = re.findall(r"\d+\.\d{2}", line) | |
if amounts: | |
structured_data["total_amount"] = amounts[-1] | |
total_amount_found = True | |
elif not total_amount_found and re.match(r"^\s*TOTAL\s*:\s*\d+\.\d{2}\s*$", line, re.IGNORECASE): | |
structured_data["total_amount"] = re.findall(r"\d+\.\d{2}", line)[0] | |
total_amount_found = True | |
# Category of invoice mapping logic | |
elif "category of invoice" in line.lower(): | |
structured_data["category_of_invoice"] = line.split(":")[-1].strip() | |
results.append(structured_data) | |
except Exception as e: | |
results.append({ | |
"invoice_number": "Error", | |
"date": "Error", | |
"place_of_invoice": "Error", | |
"total_amount": "Error", | |
"category_of_invoice": str(e), | |
}) | |
return pd.DataFrame(results) | |
# Function to process and format dates | |
def process_date(date_str): | |
try: | |
if re.match(r"\d{2}/\d{2}/\d{4}", date_str): | |
return date_str | |
elif re.match(r"\d{2} \w+ \d{4}", date_str): | |
date_obj = datetime.strptime(date_str, "%d %b %Y") | |
return date_obj.strftime("%d/%m/%Y") | |
elif re.match(r"\d{2} \w+", date_str): | |
date_obj = datetime.strptime(date_str, "%d %b") | |
return date_obj.strftime("%d/%m") + "/YYYY" | |
else: | |
return date_str | |
except: | |
return date_str | |
# Upload extracted data to Azure Blob Storage as a Parquet file | |
def upload_to_azure_blob(df): | |
try: | |
# Convert DataFrame to Parquet format | |
parquet_buffer = BytesIO() | |
df.to_parquet(parquet_buffer, index=False) | |
# Create the BlobServiceClient object | |
blob_service_client = BlobServiceClient( | |
account_url=f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net", | |
credential=STORAGE_ACCOUNT_KEY, | |
) | |
# Get the BlobClient object | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=f"invoice_data_{timestamp}.parquet") | |
# Upload the Parquet file | |
blob_client.upload_blob(parquet_buffer.getvalue(), overwrite=True) | |
# Return the file URL | |
return f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{CONTAINER_NAME}/invoice_data_{timestamp}.parquet" | |
except Exception as e: | |
return {"error": str(e)} | |
# Gradio interface function | |
def gradio_interface(username, email, image_files): | |
df = process_image_batch(model, processor, image_files) | |
file_url = upload_to_azure_blob(df) | |
user_info = f"Username: {username}\nEmail: {email}" | |
return user_info, df, f"Parquet File URL: {file_url}" | |
# Define the Gradio interface | |
grpc_interface = gr.Interface( | |
fn=gradio_interface, | |
inputs=[ | |
gr.Textbox(label="Username"), | |
gr.Textbox(label="Email"), | |
gr.Files(label="Upload Invoice Images", type="filepath"), | |
], | |
outputs=[ | |
gr.Textbox(label="User Info"), | |
gr.Dataframe(label="Extracted Invoice Data"), | |
gr.Textbox(label="Parquet File URL"), | |
], | |
title="Invoice Extraction System", | |
description="Upload invoices, extract details, and save to Azure Blob Storage.", | |
) | |
# Launch the Gradio interface | |
if __name__ == "__main__": | |
grpc_interface.launch(share=True) | |