Spaces:
Sleeping
Sleeping
import gradio as gr | |
import fitz # PyMuPDF | |
from transformers import AutoModel, AutoTokenizer | |
from PIL import Image | |
import numpy as np | |
import os | |
import base64 | |
import io | |
import uuid | |
import tempfile | |
import time | |
import shutil | |
from pathlib import Path | |
import json | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Load tokenizer and model | |
tokenizer = AutoTokenizer.from_pretrained('ucaslcl/GOT-OCR2_0', trust_remote_code=True) | |
model = AutoModel.from_pretrained('ucaslcl/GOT-OCR2_0', trust_remote_code=True, low_cpu_mem_usage=True, device_map='cuda', use_safetensors=True) | |
model = model.eval().cuda() | |
UPLOAD_FOLDER = "./uploads" | |
RESULTS_FOLDER = "./results" | |
# Ensure directories exist | |
for folder in [UPLOAD_FOLDER, RESULTS_FOLDER]: | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
def image_to_base64(image): | |
buffered = io.BytesIO() | |
image.save(buffered, format="PNG") | |
return base64.b64encode(buffered.getvalue()).decode() | |
def pdf_to_images(pdf_path): | |
images = [] | |
pdf_document = fitz.open(pdf_path) | |
for page_num in range(len(pdf_document)): | |
page = pdf_document.load_page(page_num) | |
pix = page.get_pixmap() | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
images.append(img) | |
return images | |
def run_GOT(pdf_file): | |
unique_id = str(uuid.uuid4()) | |
pdf_path = os.path.join(UPLOAD_FOLDER, f"{unique_id}.pdf") | |
shutil.copy(pdf_file, pdf_path) | |
images = pdf_to_images(pdf_path) | |
results = [] | |
html_content = "" | |
try: | |
for i, image in enumerate(images): | |
image_path = os.path.join(UPLOAD_FOLDER, f"{unique_id}_page_{i+1}.png") | |
image.save(image_path) | |
result_path = os.path.join(RESULTS_FOLDER, f"{unique_id}_page_{i+1}.html") | |
logger.info(f"Processing page {i+1}...") | |
res = model.chat_crop(tokenizer, image_path, ocr_type='format', render=True, save_render_file=result_path) | |
# Read the rendered HTML content | |
if os.path.exists(result_path): | |
with open(result_path, 'r', encoding='utf-8') as f: | |
page_html_content = f.read() | |
logger.info(f"HTML content for page {i+1} read successfully.") | |
else: | |
logger.error(f"HTML file for page {i+1} not found at {result_path}.") | |
page_html_content = "" | |
results.append({ | |
"page_number": i + 1, | |
"text": res, | |
"html": page_html_content | |
}) | |
html_content += f"<h2>Page {i + 1}</h2>" | |
html_content += page_html_content + "<br><hr><br>" # Add a separator between pages | |
if os.path.exists(image_path): | |
os.remove(image_path) | |
if os.path.exists(result_path): | |
os.remove(result_path) | |
except Exception as e: | |
logger.error(f"Error occurred: {str(e)}") | |
return f"Error: {str(e)}", None | |
finally: | |
if os.path.exists(pdf_path): | |
os.remove(pdf_path) | |
print(html_content) # Debug: Check the content of html_content | |
return json.dumps(results, indent=4), html_content | |
def cleanup_old_files(): | |
current_time = time.time() | |
for folder in [UPLOAD_FOLDER, RESULTS_FOLDER]: | |
for file_path in Path(folder).glob('*'): | |
if current_time - file_path.stat().st_mtime > 3600: # 1 hour | |
file_path.unlink() | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
with gr.Column(): | |
pdf_input = gr.File(type="filepath", label="Upload your PDF") | |
submit_button = gr.Button("Submit") | |
with gr.Column(): | |
ocr_result = gr.JSON(label="GOT output (JSON)") | |
html_result = gr.HTML(label="GOT output (HTML)", sanitize=False) | |
submit_button.click( | |
run_GOT, | |
inputs=[pdf_input], | |
outputs=[ocr_result, html_result] | |
) | |
if __name__ == "__main__": | |
cleanup_old_files() | |
demo.launch() |