from fastapi import FastAPI, File, UploadFile, HTTPException, Request from fastapi.responses import JSONResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.middleware.cors import CORSMiddleware import fitz # PyMuPDF from transformers import AutoModel, AutoTokenizer from PIL import Image import numpy as np import os import base64 import io import uuid import tempfile import time import shutil from pathlib import Path import json from starlette.requests import Request import uvicorn from bs4 import BeautifulSoup app = FastAPI() # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allow all origins for simplicity allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Load tokenizer and model tokenizer = AutoTokenizer.from_pretrained('ucaslcl/GOT-OCR2_0', trust_remote_code=True) model = AutoModel.from_pretrained('ucaslcl/GOT-OCR2_0', trust_remote_code=True, device_map='cuda', use_safetensors=True) model = model.eval().cuda() UPLOAD_FOLDER = "./uploads" RESULTS_FOLDER = "./results" # Ensure directories exist for folder in [UPLOAD_FOLDER, RESULTS_FOLDER]: if not os.path.exists(folder): os.makedirs(folder) def image_to_base64(image): buffered = io.BytesIO() image.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode() def pdf_to_images(pdf_path): images = [] pdf_document = fitz.open(pdf_path) for page_num in range(len(pdf_document)): page = pdf_document.load_page(page_num) pix = page.get_pixmap() img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) images.append(img) return images def run_GOT(pdf_file): unique_id = str(uuid.uuid4()) pdf_path = os.path.join(UPLOAD_FOLDER, f"{unique_id}.pdf") shutil.copy(pdf_file, pdf_path) images = pdf_to_images(pdf_path) results = [] try: for i, image in enumerate(images): image_path = os.path.join(UPLOAD_FOLDER, f"{unique_id}_page_{i+1}.png") image.save(image_path) result_path = os.path.join(RESULTS_FOLDER, f"{unique_id}_page_{i+1}.html") res = model.chat_crop(tokenizer, image_path, ocr_type='format', render=True, save_render_file=result_path) # Read the rendered HTML content with open(result_path, 'r') as f: html_content = f.read() # Encode the HTML content to base64 encoded_html = base64.b64encode(html_content.encode('utf-8')).decode('utf-8') iframe_src = f"data:text/html;base64,{encoded_html}" iframe = f'' download_link = f'Download Full Result' results.append({ "page_number": i + 1, "text": res, # Directly use the output from model.chat_crop "html": iframe, "download_link": download_link }) if os.path.exists(image_path): os.remove(image_path) if os.path.exists(result_path): os.remove(result_path) except Exception as e: return f"Error: {str(e)}", None finally: if os.path.exists(pdf_path): os.remove(pdf_path) return json.dumps(results, indent=4), results def cleanup_old_files(): current_time = time.time() for folder in [UPLOAD_FOLDER, RESULTS_FOLDER]: for file_path in Path(folder).glob('*'): if current_time - file_path.stat().st_mtime > 3600: # 1 hour file_path.unlink() cleanup_old_files() # Mount static files app.mount("/static", StaticFiles(directory="static"), name="static") # Set up Jinja2 templates templates = Jinja2Templates(directory="templates") @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/uploadfile/", response_class=JSONResponse) async def upload_file(file: UploadFile = File(...)): temp_dir = tempfile.TemporaryDirectory() temp_pdf_path = os.path.join(temp_dir.name, file.filename) with open(temp_pdf_path, "wb") as buffer: buffer.write(await file.read()) json_output, results = run_GOT(temp_pdf_path) temp_dir.cleanup() return results if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)