# app.py
from flask import Flask, request, jsonify, render_template_string
import PyPDF2
import sqlite3
from datetime import datetime
import re
import os
import hashlib
from typing import List, Dict
import shutil
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
import numpy as np
import joblib
import base64
from werkzeug.utils import secure_filename
# HTML template with embedded JavaScript
HTML_TEMPLATE = """
Document Processor
Smart Document Processor
Upload and analyze PDF documents with AI
Selected Files
"""
class MLDocumentClassifier:
def __init__(self):
self.labels = ['Invoice', 'Statement', 'Contract', 'Receipt', 'Report', 'Letter', 'Form']
self.classifier = Pipeline([
('tfidf', TfidfVectorizer(ngram_range=(1, 2), stop_words='english', max_features=10000)),
('clf', MultinomialNB())
])
self.is_trained = False
def predict(self, text):
return self._rule_based_classify(text)
def _rule_based_classify(self, text):
text_lower = text.lower()
rules = [
('Invoice', ['invoice', 'bill', 'payment due', 'amount due']),
('Statement', ['statement', 'balance', 'transaction history']),
('Contract', ['contract', 'agreement', 'terms and conditions']),
('Receipt', ['receipt', 'purchased', 'payment received']),
('Report', ['report', 'analysis', 'findings']),
('Letter', ['dear', 'sincerely', 'regards']),
('Form', ['form', 'please fill', 'application'])
]
scores = []
for doc_type, keywords in rules:
score = sum(1 for keyword in keywords if keyword in text_lower)
scores.append((doc_type, score / len(keywords) if keywords else 0))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[0][0]
class EnhancedDocProcessor:
def __init__(self):
self.conn = sqlite3.connect(':memory:', check_same_thread=False)
self.setup_database()
self.classifier = MLDocumentClassifier()
def setup_database(self):
self.conn.executescript('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY,
filename TEXT,
doc_type TEXT,
person_name TEXT,
amount REAL,
date TEXT,
account_number TEXT,
raw_text TEXT,
processed_date TEXT,
file_hash TEXT,
version INTEGER,
user_id TEXT
);
CREATE TABLE IF NOT EXISTS similar_docs (
doc_id INTEGER,
similar_doc_id INTEGER,
similarity_score REAL,
FOREIGN KEY (doc_id) REFERENCES documents (id),
FOREIGN KEY (similar_doc_id) REFERENCES documents (id)
);
''')
self.conn.commit()
def extract_text(self, pdf_path: str) -> str:
try:
text_parts = []
with open(pdf_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
for page in reader.pages:
text = page.extract_text()
if text:
text_parts.append(text)
return "\n".join(text_parts)
except Exception as e:
return f"Error extracting text: {str(e)}"
def extract_metadata(self, text: str) -> Dict:
return {
'amount': next((float(amt.replace('$','').replace(',',''))
for amt in re.findall(r'\$[\d,]+\.?\d*', text)), 0.0),
'date': next(iter(re.findall(r'\d{1,2}/\d{1,2}/\d{4}', text)), None),
'account_number': next(iter(re.findall(r'Account\s*#?\s*:?\s*(\d{8,12})', text)), None),
'person_name': next(iter(re.findall(r'(?:Mr\.|Mrs\.|Ms\.|Dr\.)\s+([A-Z][a-z]+\s+[A-Z][a-z]+)', text)), "Unknown")
}
def process_document(self, pdf_path: str, filename: str, user_id: str = None) -> Dict:
text = self.extract_text(pdf_path)
doc_type = self.classifier.predict(text)
metadata = self.extract_metadata(text)
cursor = self.conn.execute('''
INSERT INTO documents
(filename, doc_type, person_name, amount, date,
account_number, raw_text, processed_date, user_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
filename, doc_type, metadata['person_name'],
metadata['amount'], metadata['date'],
metadata['account_number'], text,
datetime.now().isoformat(), user_id
))
doc_id = cursor.lastrowid
self.conn.commit()
return {
'id': doc_id,
'filename': filename,
'doc_type': doc_type,
**metadata
}
def process_batch(self, file_paths: List[str], user_id: str = None) -> List[Dict]:
results = []
for file_path in file_paths:
try:
result = self.process_document(file_path, os.path.basename(file_path), user_id)
results.append({"status": "success", "result": result, "file": file_path})
except Exception as e:
results.append({"status": "error", "error": str(e), "file": file_path})
return results
app = Flask(__name__)
processor = EnhancedDocProcessor()
@app.route('/')
def index():
return render_template_string(HTML_TEMPLATE)
@app.route('/batch_process', methods=['POST'])
def batch_process():
if 'files[]' not in request.files:
return jsonify({'error': 'No files uploaded'}), 400
files = request.files.getlist('files[]')
user_id = request.form.get('user_id')
file_paths = []
for file in files:
if file.filename.endswith('.pdf'):
temp_path = f"temp_{file.filename}"
file.save(temp_path)
file_paths.append(temp_path)
try:
results = processor.process_batch(file_paths, user_id)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
# Clean up temporary files
for path in file_paths:
try:
os.remove(path)
except:
pass
return jsonify(results)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)