# app.py from flask import Flask, request, jsonify, render_template_string import PyPDF2 import sqlite3 from datetime import datetime import re import os import hashlib from typing import List, Dict import shutil from sklearn.pipeline import Pipeline from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.naive_bayes import MultinomialNB import numpy as np import joblib import base64 from werkzeug.utils import secure_filename import tempfile class PersonIdentifier: def __init__(self): self.name_patterns = [ r'(?:Mr\.|Mrs\.|Ms\.|Dr\.)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)', # Titles with names r'Name:?\s*([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)', # Names with "Name:" prefix r'(?m)^([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)$', # Names on their own line r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)' # General names ] self.id_patterns = { 'ssn': r'(?!000|666|9\d{2})\d{3}-(?!00)\d{2}-(?!0000)\d{4}', 'drivers_license': r'[A-Z]\d{7}', 'passport': r'[A-Z]\d{8}', } self.email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+' def identify_person(self, text: str) -> Dict: person_data = { 'name': None, 'id_numbers': {}, 'email': None } # Extract name with improved patterns for pattern in self.name_patterns: names = re.findall(pattern, text) if names: person_data['name'] = names[0].strip() break # Extract IDs for id_type, pattern in self.id_patterns.items(): ids = re.findall(pattern, text) if ids: person_data['id_numbers'][id_type] = ids[0] # Extract email emails = re.findall(self.email_pattern, text) if emails: person_data['email'] = emails[0] return person_data class MLDocumentClassifier: def __init__(self): self.labels = [ 'Invoice', 'BankApplication_CreditCard', 'BankApplication_SavingsAccount', 'ID_DriversLicense', 'ID_Passport', 'ID_StateID', 'Financial_PayStub', 'Financial_TaxReturn', 'Financial_IncomeStatement', 'Receipt' ] def predict(self, text): return self._rule_based_classify(text) def _rule_based_classify(self, text): text_lower = text.lower() # Primary document indicators (strong signals) if 'invoice' in text_lower or 'inv-' in text_lower: return 'Invoice' rules = [ ('BankApplication_CreditCard', ['credit card application', 'card request', 'new card']), ('BankApplication_SavingsAccount', ['savings account', 'open account', 'new account']), ('ID_DriversLicense', ['driver license', 'driving permit', 'operator license']), ('ID_Passport', ['passport', 'travel document']), ('ID_StateID', ['state id', 'identification card']), ('Financial_PayStub', ['pay stub', 'salary', 'wages']), ('Financial_TaxReturn', ['tax return', 'form 1040', 'tax year']), ('Financial_IncomeStatement', ['income statement', 'earnings report']), ('Receipt', ['receipt', 'payment received', 'transaction record']) ] max_score = 0 best_type = 'Unknown' for doc_type, keywords in rules: score = sum(1 for keyword in keywords if keyword in text_lower) weighted_score = score / len(keywords) if keywords else 0 if weighted_score > max_score: max_score = weighted_score best_type = doc_type return best_type class EnhancedDocProcessor: def __init__(self): self.conn = sqlite3.connect(':memory:', check_same_thread=False) self.setup_database() self.classifier = MLDocumentClassifier() self.person_identifier = PersonIdentifier() def setup_database(self): self.conn.executescript(''' CREATE TABLE IF NOT EXISTS persons ( id INTEGER PRIMARY KEY, name TEXT, email TEXT, ssn TEXT, drivers_license TEXT, passport TEXT, created_date TEXT ); CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY, filename TEXT, doc_type TEXT, person_id INTEGER, amount REAL, date TEXT, account_number TEXT, raw_text TEXT, processed_date TEXT, file_hash TEXT, confidence_score REAL, FOREIGN KEY (person_id) REFERENCES persons (id) ); CREATE TABLE IF NOT EXISTS similar_docs ( doc_id INTEGER, similar_doc_id INTEGER, similarity_score REAL, FOREIGN KEY (doc_id) REFERENCES documents (id), FOREIGN KEY (similar_doc_id) REFERENCES documents (id) ); ''') self.conn.commit() def extract_text(self, pdf_path: str) -> str: try: text_parts = [] with open(pdf_path, 'rb') as file: reader = PyPDF2.PdfReader(file) for page in reader.pages: text = page.extract_text() if text: text_parts.append(text) return "\n".join(text_parts) except Exception as e: return f"Error extracting text: {str(e)}" def extract_metadata(self, text: str) -> Dict: metadata = { 'amount': next((float(amt.replace('$','').replace(',','')) for amt in re.findall(r'\$[\d,]+\.?\d*', text)), 0.0), 'date': next(iter(re.findall(r'\d{1,2}/\d{1,2}/\d{4}', text)), None), 'account_number': next(iter(re.findall(r'Account\s*#?\s*:?\s*(\d{8,12})', text)), None), } return metadata def get_or_create_person(self, person_data: Dict) -> int: cursor = self.conn.execute( 'SELECT id FROM persons WHERE name = ? OR email = ? OR ssn = ? OR drivers_license = ? OR passport = ?', (person_data['name'], person_data.get('email'), person_data.get('id_numbers', {}).get('ssn'), person_data.get('id_numbers', {}).get('drivers_license'), person_data.get('id_numbers', {}).get('passport')) ) result = cursor.fetchone() if result: return result[0] cursor = self.conn.execute(''' INSERT INTO persons (name, email, ssn, drivers_license, passport, created_date) VALUES (?, ?, ?, ?, ?, ?) ''', ( person_data['name'], person_data.get('email'), person_data.get('id_numbers', {}).get('ssn'), person_data.get('id_numbers', {}).get('drivers_license'), person_data.get('id_numbers', {}).get('passport'), datetime.now().isoformat() )) self.conn.commit() return cursor.lastrowid def process_document(self, pdf_path: str, filename: str) -> Dict: text = self.extract_text(pdf_path) doc_type = self.classifier.predict(text) metadata = self.extract_metadata(text) person_data = self.person_identifier.identify_person(text) person_id = self.get_or_create_person(person_data) cursor = self.conn.execute(''' INSERT INTO documents (filename, doc_type, person_id, amount, date, account_number, raw_text, processed_date, confidence_score) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( filename, doc_type, person_id, metadata['amount'], metadata['date'], metadata['account_number'], text, datetime.now().isoformat(), 0.85 )) doc_id = cursor.lastrowid self.conn.commit() return { 'id': doc_id, 'filename': filename, 'doc_type': doc_type, 'person': person_data, **metadata } def process_batch(self, file_paths: List[str]) -> List[Dict]: results = [] for file_path in file_paths: try: result = self.process_document(file_path, os.path.basename(file_path)) results.append({"status": "success", "result": result, "file": file_path}) except Exception as e: results.append({"status": "error", "error": str(e), "file": file_path}) return results # HTML template with embedded JavaScript HTML_TEMPLATE = """ Document Processor

Smart Document Processor

Upload and analyze PDF documents with AI

Drop PDFs here or click to upload Supports multiple files
""" app = Flask(__name__) processor = EnhancedDocProcessor() @app.route('/') def index(): return render_template_string(HTML_TEMPLATE) @app.route('/batch_process', methods=['POST']) def batch_process(): if 'files[]' not in request.files: return jsonify({'error': 'No files uploaded'}), 400 files = request.files.getlist('files[]') with tempfile.TemporaryDirectory() as temp_dir: file_paths = [] for file in files: if file.filename.endswith('.pdf'): secure_name = secure_filename(file.filename) temp_path = os.path.join(temp_dir, secure_name) file.save(temp_path) file_paths.append(temp_path) try: results = processor.process_batch(file_paths) except Exception as e: return jsonify({'error': str(e)}), 500 return jsonify(results) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)