Spaces:
Sleeping
Sleeping
| import json | |
| from typing import Dict, Any, List, Union | |
| import os | |
| import base64 | |
| import requests | |
| from tqdm import tqdm | |
| import concurrent.futures | |
| from pathlib import Path | |
| import cv2 | |
| from pdf2image import convert_from_path | |
| class OCRProcessor: | |
| def __init__(self, model_name: str = "llama3.2-vision:11b", | |
| base_url: str = "http://localhost:11434/api/generate", | |
| max_workers: int = 1): | |
| self.model_name = model_name | |
| self.base_url = base_url | |
| self.max_workers = max_workers | |
| def _encode_image(self, image_path: str) -> str: | |
| """Convert image to base64 string""" | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode("utf-8") | |
| def _preprocess_image(self, image_path: str) -> str: | |
| """ | |
| Preprocess image before OCR: | |
| - Convert PDF to image if needed | |
| - Auto-rotate | |
| - Enhance contrast | |
| - Reduce noise | |
| """ | |
| # Handle PDF files | |
| if image_path.lower().endswith('.pdf'): | |
| pages = convert_from_path(image_path) | |
| if not pages: | |
| raise ValueError("Could not convert PDF to image") | |
| # Save first page as temporary image | |
| temp_path = f"{image_path}_temp.jpg" | |
| pages[0].save(temp_path, 'JPEG') | |
| image_path = temp_path | |
| # Read image | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| raise ValueError(f"Could not read image at {image_path}") | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| # Enhance contrast using CLAHE | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
| enhanced = clahe.apply(gray) | |
| # Denoise | |
| denoised = cv2.fastNlMeansDenoising(enhanced) | |
| # Auto-rotate if needed | |
| # TODO: Implement rotation detection and correction | |
| # Save preprocessed image | |
| preprocessed_path = f"{image_path}_preprocessed.jpg" | |
| cv2.imwrite(preprocessed_path, denoised) | |
| return preprocessed_path | |
| def process_image(self, image_path: str, format_type: str = "markdown", preprocess: bool = True) -> str: | |
| """ | |
| Process an image and extract text in the specified format | |
| Args: | |
| image_path: Path to the image file | |
| format_type: One of ["markdown", "text", "json", "structured", "key_value"] | |
| preprocess: Whether to apply image preprocessing | |
| """ | |
| try: | |
| if preprocess: | |
| image_path = self._preprocess_image(image_path) | |
| image_base64 = self._encode_image(image_path) | |
| # Clean up temporary files | |
| if image_path.endswith(('_preprocessed.jpg', '_temp.jpg')): | |
| os.remove(image_path) | |
| # Generic prompt templates for different formats | |
| prompts = { | |
| "markdown": """Please look at this image and extract all the text content. Format the output in markdown: | |
| - Use headers (# ## ###) for titles and sections | |
| - Use bullet points (-) for lists | |
| - Use proper markdown formatting for emphasis and structure | |
| - Preserve the original text hierarchy and formatting as much as possible""", | |
| "text": """Please look at this image and extract all the text content. | |
| Provide the output as plain text, maintaining the original layout and line breaks where appropriate. | |
| Include all visible text from the image.""", | |
| "json": """Please look at this image and extract all the text content. Structure the output as JSON with these guidelines: | |
| - Identify different sections or components | |
| - Use appropriate keys for different text elements | |
| - Maintain the hierarchical structure of the content | |
| - Include all visible text from the image""", | |
| "structured": """Please look at this image and extract all the text content, focusing on structural elements: | |
| - Identify and format any tables | |
| - Extract lists and maintain their structure | |
| - Preserve any hierarchical relationships | |
| - Format sections and subsections clearly""", | |
| "key_value": """Please look at this image and extract text that appears in key-value pairs: | |
| - Look for labels and their associated values | |
| - Extract form fields and their contents | |
| - Identify any paired information | |
| - Present each pair on a new line as 'key: value'""" | |
| } | |
| # Get the appropriate prompt | |
| prompt = prompts.get(format_type, prompts["text"]) | |
| # Prepare the request payload | |
| payload = { | |
| "model": self.model_name, | |
| "prompt": prompt, | |
| "stream": False, | |
| "images": [image_base64] | |
| } | |
| # Make the API call to Ollama | |
| response = requests.post(self.base_url, json=payload) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| result = response.json().get("response", "") | |
| # Clean up the result if needed | |
| if format_type == "json": | |
| try: | |
| # Try to parse and re-format JSON if it's valid | |
| json_data = json.loads(result) | |
| return json.dumps(json_data, indent=2) | |
| except json.JSONDecodeError: | |
| # If JSON parsing fails, return the raw result | |
| return result | |
| return result | |
| except Exception as e: | |
| return f"Error processing image: {str(e)}" | |
| def process_batch( | |
| self, | |
| input_path: Union[str, List[str]], | |
| format_type: str = "markdown", | |
| recursive: bool = False, | |
| preprocess: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process multiple images in batch | |
| Args: | |
| input_path: Path to directory or list of image paths | |
| format_type: Output format type | |
| recursive: Whether to search directories recursively | |
| preprocess: Whether to apply image preprocessing | |
| Returns: | |
| Dictionary with results and statistics | |
| """ | |
| # Collect all image paths | |
| image_paths = [] | |
| if isinstance(input_path, str): | |
| base_path = Path(input_path) | |
| if base_path.is_dir(): | |
| pattern = '**/*' if recursive else '*' | |
| for ext in ['.png', '.jpg', '.jpeg', '.pdf', '.tiff']: | |
| image_paths.extend(base_path.glob(f'{pattern}{ext}')) | |
| else: | |
| image_paths = [base_path] | |
| else: | |
| image_paths = [Path(p) for p in input_path] | |
| results = {} | |
| errors = {} | |
| # Process images in parallel with progress bar | |
| with tqdm(total=len(image_paths), desc="Processing images") as pbar: | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| future_to_path = { | |
| executor.submit(self.process_image, str(path), format_type, preprocess): path | |
| for path in image_paths | |
| } | |
| for future in concurrent.futures.as_completed(future_to_path): | |
| path = future_to_path[future] | |
| try: | |
| results[str(path)] = future.result() | |
| except Exception as e: | |
| errors[str(path)] = str(e) | |
| pbar.update(1) | |
| return { | |
| "results": results, | |
| "errors": errors, | |
| "statistics": { | |
| "total": len(image_paths), | |
| "successful": len(results), | |
| "failed": len(errors) | |
| } | |
| } |