# app.py from flask import Flask, request, jsonify, render_template_string import PyPDF2 import sqlite3 from datetime import datetime import re import os import hashlib from typing import List, Dict import shutil from sklearn.pipeline import Pipeline from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.naive_bayes import MultinomialNB import numpy as np import joblib import base64 from werkzeug.utils import secure_filename # HTML template with embedded JavaScript HTML_TEMPLATE = """ Document Processor

Smart Document Processor

Upload and analyze PDF documents with AI

Drop PDFs here or click to upload Supports multiple files
""" class MLDocumentClassifier: def __init__(self): self.labels = ['Invoice', 'Statement', 'Contract', 'Receipt', 'Report', 'Letter', 'Form'] self.classifier = Pipeline([ ('tfidf', TfidfVectorizer(ngram_range=(1, 2), stop_words='english', max_features=10000)), ('clf', MultinomialNB()) ]) self.is_trained = False def predict(self, text): return self._rule_based_classify(text) def _rule_based_classify(self, text): text_lower = text.lower() rules = [ ('Invoice', ['invoice', 'bill', 'payment due', 'amount due']), ('Statement', ['statement', 'balance', 'transaction history']), ('Contract', ['contract', 'agreement', 'terms and conditions']), ('Receipt', ['receipt', 'purchased', 'payment received']), ('Report', ['report', 'analysis', 'findings']), ('Letter', ['dear', 'sincerely', 'regards']), ('Form', ['form', 'please fill', 'application']) ] scores = [] for doc_type, keywords in rules: score = sum(1 for keyword in keywords if keyword in text_lower) scores.append((doc_type, score / len(keywords) if keywords else 0)) scores.sort(key=lambda x: x[1], reverse=True) return scores[0][0] class EnhancedDocProcessor: def __init__(self): self.conn = sqlite3.connect(':memory:', check_same_thread=False) self.setup_database() self.classifier = MLDocumentClassifier() def setup_database(self): self.conn.executescript(''' CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY, filename TEXT, doc_type TEXT, person_name TEXT, amount REAL, date TEXT, account_number TEXT, raw_text TEXT, processed_date TEXT, file_hash TEXT, version INTEGER, user_id TEXT ); CREATE TABLE IF NOT EXISTS similar_docs ( doc_id INTEGER, similar_doc_id INTEGER, similarity_score REAL, FOREIGN KEY (doc_id) REFERENCES documents (id), FOREIGN KEY (similar_doc_id) REFERENCES documents (id) ); ''') self.conn.commit() def extract_text(self, pdf_path: str) -> str: try: text_parts = [] with open(pdf_path, 'rb') as file: reader = PyPDF2.PdfReader(file) for page in reader.pages: text = page.extract_text() if text: text_parts.append(text) return "\n".join(text_parts) except Exception as e: return f"Error extracting text: {str(e)}" def extract_metadata(self, text: str) -> Dict: return { 'amount': next((float(amt.replace('$','').replace(',','')) for amt in re.findall(r'\$[\d,]+\.?\d*', text)), 0.0), 'date': next(iter(re.findall(r'\d{1,2}/\d{1,2}/\d{4}', text)), None), 'account_number': next(iter(re.findall(r'Account\s*#?\s*:?\s*(\d{8,12})', text)), None), 'person_name': next(iter(re.findall(r'(?:Mr\.|Mrs\.|Ms\.|Dr\.)\s+([A-Z][a-z]+\s+[A-Z][a-z]+)', text)), "Unknown") } def process_document(self, pdf_path: str, filename: str, user_id: str = None) -> Dict: text = self.extract_text(pdf_path) doc_type = self.classifier.predict(text) metadata = self.extract_metadata(text) cursor = self.conn.execute(''' INSERT INTO documents (filename, doc_type, person_name, amount, date, account_number, raw_text, processed_date, user_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( filename, doc_type, metadata['person_name'], metadata['amount'], metadata['date'], metadata['account_number'], text, datetime.now().isoformat(), user_id )) doc_id = cursor.lastrowid self.conn.commit() return { 'id': doc_id, 'filename': filename, 'doc_type': doc_type, **metadata } def process_batch(self, file_paths: List[str], user_id: str = None) -> List[Dict]: results = [] for file_path in file_paths: try: result = self.process_document(file_path, os.path.basename(file_path), user_id) results.append({"status": "success", "result": result, "file": file_path}) except Exception as e: results.append({"status": "error", "error": str(e), "file": file_path}) return results app = Flask(__name__) processor = EnhancedDocProcessor() @app.route('/') def index(): return render_template_string(HTML_TEMPLATE) @app.route('/batch_process', methods=['POST']) def batch_process(): if 'files[]' not in request.files: return jsonify({'error': 'No files uploaded'}), 400 files = request.files.getlist('files[]') user_id = request.form.get('user_id') file_paths = [] for file in files: if file.filename.endswith('.pdf'): temp_path = f"temp_{file.filename}" file.save(temp_path) file_paths.append(temp_path) try: results = processor.process_batch(file_paths, user_id) except Exception as e: return jsonify({'error': str(e)}), 500 finally: # Clean up temporary files for path in file_paths: try: os.remove(path) except: pass return jsonify(results) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)