Spaces:
Sleeping
Sleeping
import base64 | |
import streamlit as st | |
from openai import OpenAI | |
import os | |
from dotenv import load_dotenv | |
import fitz | |
from PIL import Image | |
import io | |
import tempfile | |
# Load environment variables | |
load_dotenv() | |
# Initialize OpenAI client | |
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) | |
def convert_pdf_to_images(pdf_file): | |
"""Convert PDF to list of images using PyMuPDF""" | |
images = [] | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(pdf_file.getvalue()) | |
pdf_path = tmp_file.name | |
pdf_document = fitz.open(pdf_path) | |
for page_number in range(pdf_document.page_count): | |
page = pdf_document[page_number] | |
pix = page.get_pixmap() | |
img_data = pix.tobytes("png") | |
image = Image.open(io.BytesIO(img_data)) | |
images.append(image) | |
pdf_document.close() | |
os.unlink(pdf_path) | |
return images | |
def format_response(text): | |
"""Format the analysis response with clean styling""" | |
formatted_text = "" | |
# Split into pages | |
pages = text.split("Page") | |
for page_num, page_content in enumerate(pages[1:], 1): # Skip first empty split | |
formatted_text += f'\n### Page {page_num}\n' | |
# Process each line | |
lines = page_content.split('\n') | |
for line in lines: | |
# Skip empty lines and lines with asterisks | |
if line.strip() and not line.strip().startswith('*') and not line.strip().startswith('Here'): | |
# Remove asterisks and dashes | |
line = line.replace('**', '').replace('- ', '') | |
if ':' in line: | |
label, value = line.split(':', 1) | |
formatted_text += f'- *{label.strip()}*: {value.strip()}\n' | |
return formatted_text | |
def analyze_image(image): | |
"""Analyze image using OpenAI API""" | |
try: | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format='PNG') | |
img_byte_arr = img_byte_arr.getvalue() | |
base64_image = base64.b64encode(img_byte_arr).decode("utf-8") | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", # Update to the correct model name | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": """Please analyze the image and extract the following information: | |
- Sender information | |
- Recipient information | |
- Container details | |
- Weights and measurements | |
- Dates and reference numbers | |
- Cargo details | |
Format the response as 'Label: Value' pairs.""" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{base64_image}" | |
}, | |
}, | |
], | |
} | |
], | |
max_tokens=1000 | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
return f"An error occurred: {str(e)}" | |
def main(): | |
st.set_page_config(page_title="Document Analysis App", layout="wide") | |
st.title("Document Analysis App") | |
uploaded_file = st.file_uploader("Upload document (PDF/Image)", type=['pdf', 'png', 'jpg', 'jpeg']) | |
if uploaded_file is not None: | |
if uploaded_file.type == "application/pdf": | |
# Handle PDF | |
with st.spinner("Processing PDF..."): | |
images = convert_pdf_to_images(uploaded_file) | |
if st.button("Extract Information"): | |
with st.spinner("Analyzing document..."): | |
all_results = [] | |
for i, image in enumerate(images, 1): | |
result = analyze_image(image) | |
all_results.append(f"Page {i} Information:\n{result}") | |
combined_results = "\n\n".join(all_results) | |
st.markdown(format_response(combined_results)) | |
else: | |
# Handle single image | |
image = Image.open(uploaded_file) | |
if st.button("Extract Information"): | |
with st.spinner("Analyzing document..."): | |
result = analyze_image(image) | |
st.markdown(format_response(result)) | |
# Call the main function directly (no need for __name__ == "__main__" in Hugging Face Spaces) | |
main() |