MathLens2 / app.py
Manvikk's picture
Update app.py
adddfb0 verified
raw
history blame
24.9 kB
import os
import uuid
import threading
import time
import cv2
import numpy as np
import base64
from flask import Flask, render_template_string, request, redirect, flash, url_for, jsonify
import roboflow
import torch
from collections import Counter
app = Flask(__name__)
app.secret_key = 'your_secret_key' # Replace with a secure secret key
# Global dictionary to hold job progress and results
jobs = {} # jobs[job_id] = {"progress": int, "result": {...}}
#########################################
# 1. Initialize the Models
#########################################
# --- Roboflow Box Detection Model ---
API_KEY = "wLjPoPYaLmrqCIOFA0RH" # Your Roboflow API key
PROJECT_ID = "base-model-box-r4suo-8lkk1-6dbqh" # Your Roboflow project ID
VERSION_NUMBER = "2" # Your trained model version number
try:
rf = roboflow.Roboflow(api_key=API_KEY)
workspace = rf.workspace()
project = workspace.project(PROJECT_ID)
version = project.version(VERSION_NUMBER)
box_model = version.model # This model is trained for detecting boxes
print("Roboflow model loaded successfully.")
except Exception as e:
print("Error initializing Roboflow model:", e)
box_model = None
# --- YOLOv5 Pretrained Model for Persons & Cars ---
try:
yolov5_model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
print("YOLOv5 model loaded successfully.")
except Exception as e:
print("Error loading YOLOv5 model:", e)
yolov5_model = None
#########################################
# 2. Helper Functions
#########################################
def compute_iou(boxA, boxB):
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interWidth = max(0, xB - xA)
interHeight = max(0, yB - yA)
interArea = interWidth * interHeight
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
if boxAArea + boxBArea - interArea == 0:
return 0
return interArea / float(boxAArea + boxBArea - interArea)
# Lower the NMS threshold to 0.3 so that adjacent boxes are less likely to be merged.
def custom_nms(preds, iou_threshold=0.3):
preds = sorted(preds, key=lambda x: x["confidence"], reverse=True)
filtered_preds = []
for pred in preds:
keep = True
for kept in filtered_preds:
if compute_iou(pred["box"], kept["box"]) > iou_threshold:
keep = False
break
if keep:
filtered_preds.append(pred)
return filtered_preds
# The process_image function now uses:
# - Roboflow prediction parameters: confidence=50 and a lower overlap=10.
# - A custom NMS with IoU threshold of 0.3.
# - ArUco marker detection for conversion factor computation.
def process_image(job_id, image_path, object_type, multiplier):
try:
jobs[job_id]['progress'] = 10
# Load the original image
image = cv2.imread(image_path)
if image is None:
jobs[job_id]['progress'] = 100
jobs[job_id]['result'] = {"error": "Could not read the image."}
return
jobs[job_id]['progress'] = 20
img_height, img_width = image.shape[:2]
# Set dynamic thickness based on image size and multiplier.
thickness = max(2, int(min(img_width, img_height) / 300)) * multiplier
detection_info = []
if object_type == "box":
if box_model is None:
jobs[job_id]['progress'] = 100
jobs[job_id]['result'] = {"error": "Roboflow model not available."}
return
# --- BOX DETECTION ---
# Upscale if the image is small.
scale_factor = 1
if img_width < 1000 or img_height < 1000:
scale_factor = 2
# Use improved parameters: confidence=50 and overlap=10 (lowered overlap).
if scale_factor > 1:
upscaled_image = cv2.resize(image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR)
temp_path = "upscaled.jpg"
cv2.imwrite(temp_path, upscaled_image)
results = box_model.predict(temp_path, confidence=50, overlap=10).json()
else:
results = box_model.predict(image_path, confidence=50, overlap=10).json()
predictions = results.get("predictions", [])
processed_preds = []
for prediction in predictions:
try:
if scale_factor > 1:
x = prediction["x"] / scale_factor
y = prediction["y"] / scale_factor
width = prediction["width"] / scale_factor
height = prediction["height"] / scale_factor
else:
x = prediction["x"]
y = prediction["y"]
width = prediction["width"]
height = prediction["height"]
# Convert center-based coordinates to corner-based bounding box.
x1 = int(round(x - width / 2))
y1 = int(round(y - height / 2))
x2 = int(round(x + width / 2))
y2 = int(round(y + height / 2))
# Clamp coordinates within the image.
x1 = max(0, min(x1, img_width - 1))
y1 = max(0, min(y1, img_height - 1))
x2 = max(0, min(x2, img_width - 1))
y2 = max(0, min(y2, img_height - 1))
processed_preds.append({
"box": (x1, y1, x2, y2),
"class": prediction["class"],
"confidence": prediction["confidence"]
})
except Exception as e:
continue
# Apply custom NMS with an IoU threshold of 0.3.
box_detections = custom_nms(processed_preds, iou_threshold=0.3)
jobs[job_id]['progress'] = 60
# --- ARUCO MARKER DETECTION & SIZE CONVERSION ---
marker_real_width_cm = 5.0 # The printed marker is 5 cm x 5 cm.
try:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250)
if hasattr(cv2.aruco, 'DetectorParameters_create'):
aruco_params = cv2.aruco.DetectorParameters_create()
else:
aruco_params = cv2.aruco.DetectorParameters()
corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=aruco_params)
if ids is not None and len(corners) > 0:
marker_corners = corners[0].reshape((4, 2))
cv2.aruco.drawDetectedMarkers(image, corners, ids)
# Compute the marker's bounding box.
min_x = np.min(marker_corners[:, 0])
max_x = np.max(marker_corners[:, 0])
min_y = np.min(marker_corners[:, 1])
max_y = np.max(marker_corners[:, 1])
width_pixels = max_x - min_x
height_pixels = max_y - min_y
if width_pixels > 0 and height_pixels > 0:
# Use the average conversion factor from width and height.
conversion_factor = (marker_real_width_cm / width_pixels + marker_real_width_cm / height_pixels) / 2
else:
conversion_factor = None
else:
conversion_factor = None
except Exception as e:
conversion_factor = None
# --- Draw Boxes & Compute Sizes ---
for pred in box_detections:
x1, y1, x2, y2 = pred["box"]
label = pred["class"]
confidence = pred["confidence"]
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), thickness)
if conversion_factor is not None:
box_width_pixels = x2 - x1
box_height_pixels = y2 - y1
box_width_cm = box_width_pixels * conversion_factor
box_height_cm = box_height_pixels * conversion_factor
detection_info.append({
"class": label,
"confidence": f"{confidence:.2f}",
"width_cm": f"{box_width_cm:.1f}",
"height_cm": f"{box_height_cm:.1f}"
})
else:
detection_info.append({
"class": label,
"confidence": f"{confidence:.2f}",
"width_cm": "N/A",
"height_cm": "N/A"
})
text = f"{label} ({confidence:.2f})"
(text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(image, (x1, y1 - text_height - baseline - 5), (x1 + text_width, y1 - 5), (0, 255, 0), -1)
cv2.putText(image, text, (x1, y1 - 5 - baseline), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
elif object_type in {"person", "car"}:
if yolov5_model is None:
jobs[job_id]['progress'] = 100
jobs[job_id]['result'] = {"error": "YOLOv5 model not available."}
return
try:
img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
yolo_results = yolov5_model(img_rgb)
df = yolo_results.pandas().xyxy[0]
for _, row in df.iterrows():
if row['name'] == object_type:
xmin = int(row['xmin'])
ymin = int(row['ymin'])
xmax = int(row['xmax'])
ymax = int(row['ymax'])
conf = row['confidence']
label = row['name']
cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (255, 0, 0), thickness)
text = f"{label} ({conf:.2f})"
(text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(image, (xmin, ymin - text_height - baseline - 5), (xmin + text_width, ymin - 5), (255, 0, 0), -1)
cv2.putText(image, text, (xmin, ymin - 5 - baseline), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
detection_info.append({
"class": label,
"confidence": f"{conf:.2f}",
"width_cm": "N/A",
"height_cm": "N/A"
})
except Exception as e:
jobs[job_id]['progress'] = 100
jobs[job_id]['result'] = {"error": "Error during YOLOv5 inference."}
return
# Draw summary text on the image
detection_counts = Counter(det["class"] for det in detection_info)
if detection_counts:
top_text = ", ".join(f"{cls}: {count}" for cls, count in detection_counts.items())
(info_width, info_height), info_baseline = cv2.getTextSize(top_text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
cv2.rectangle(image, (5, 5), (5 + info_width, 5 + info_height + info_baseline), (0, 255, 0), -1)
cv2.putText(image, top_text, (5, 5 + info_height), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
jobs[job_id]['progress'] = 100
retval, buffer = cv2.imencode('.jpg', image)
image_data = base64.b64encode(buffer).decode('utf-8')
jobs[job_id]['result'] = {"image_data": image_data, "detection_info": detection_info}
except Exception as e:
jobs[job_id]['progress'] = 100
jobs[job_id]['result'] = {"error": "Unexpected error during processing."}
#########################################
# 3. HTML Templates
#########################################
landing_template = '''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MathLens</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css">
<style>
@import url('https://fonts.googleapis.com/css2?family=Share+Tech+Mono&display=swap');
body { background-color: #fff; color: #000; font-family: "Share Tech Mono", monospace;
text-align: center; display: flex; flex-direction: column; justify-content: center;
align-items: center; min-height: 100vh; padding: 20px; }
h1 { font-size: 2.5rem; margin-bottom: 20px; }
p { font-size: 1.5rem; margin-bottom: 40px; }
.btn { display: inline-block; margin: 10px; padding: 15px 30px;
font-size: 1.2rem; text-decoration: none; border: 2px solid #000;
color: #000; transition: background-color 0.3s, color 0.3s; }
.btn:hover { background-color: #000; color: #fff; }
</style>
</head>
<body>
<h1>MathLens</h1>
<p>What do you want to count?</p>
<div>
<a href="{{ url_for('upload') }}?object_type=person" class="btn">People</a>
<a href="{{ url_for('upload') }}?object_type=car" class="btn">Cars</a>
<a href="{{ url_for('upload') }}?object_type=box" class="btn">Boxes</a>
</div>
</body>
</html>
'''
upload_template = '''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MathLens - AI Detection & Measurement</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css">
<style>
@import url('https://fonts.googleapis.com/css2?family=Share+Tech+Mono&display=swap');
body { background-color: #fff; color: #000; font-family: "Share Tech Mono", monospace;
text-align: center; display: flex; flex-direction: column; justify-content: center;
align-items: center; min-height: 100vh; padding: 20px; }
.typing-effect { font-size: 2rem; font-weight: bold; margin-bottom: 20px;
height: 50px; white-space: nowrap; }
form { margin-bottom: 20px; }
input[type="file"], button { display: block; margin: 10px auto; padding: 10px;
background: none; border: 2px solid #000; color: #000;
font-size: 1rem; font-family: "Share Tech Mono", monospace;
cursor: pointer; }
input[type="file"]::file-selector-button { background: none; border: none; color: #000; }
.home-btn { display: inline-block; margin: 10px auto 20px; padding: 10px 20px;
border: 2px solid #000; color: #000; text-decoration: none;
font-family: "Share Tech Mono", monospace; transition: background-color 0.3s, color 0.3s; }
.home-btn:hover { background-color: #000; color: #fff; }
/* Progress overlay styles */
#progressOverlay { position: fixed; top: 0; left: 0; width: 100%; height: 100%;
background: rgba(255,255,255,0.9); display: none;
align-items: center; justify-content: center; flex-direction: column;
z-index: 9999; }
#progressContainer { width: 80%; max-width: 400px; }
#progressBar { height: 20px; width: 0; background-color: #000; border-radius: 10px;
transition: width 0.2s linear; }
#progressText { margin-top: 10px; font-size: 1.2rem; }
.content-wrapper { display: flex; flex-direction: row; align-items: center;
justify-content: space-evenly; width: 100%; max-width: 1200px;
flex-wrap: wrap; gap: 20px; }
.result-img { max-width: 100%; border: 2px solid #000; }
table { width: 100%; max-width: 600px; border-collapse: collapse; }
th, td { border: 1px solid #000; padding: 5px; text-align: center; }
.footer { margin-top: 20px; font-size: 0.9em; color: #000; }
@media (max-width: 768px) {
.typing-effect { font-size: 1.5rem; }
.content-wrapper { flex-direction: column; align-items: center; }
.result-img { max-width: 90%; }
table { max-width: 100%; }
}
</style>
</head>
<body>
<!-- Home button -->
<a href="{{ url_for('landing') }}" class="home-btn">Home</a>
<!-- Progress overlay -->
<div id="progressOverlay">
<div id="progressContainer">
<div id="progressBar"></div>
<div id="progressText">Starting up... 🛠️</div>
</div>
</div>
<div class="typing-effect" id="typing"></div>
<!-- The file upload form (submission handled via AJAX) -->
<form id="uploadForm">
<input type="file" name="file" accept="image/*" required>
<!-- Hidden fields to pass the selected object type and device multiplier -->
<input type="hidden" name="object_type" value="{{ object_type }}">
<input type="hidden" name="multiplier" id="multiplier" value="1">
<button type="submit">Analyze Image</button>
</form>
<div id="resultContainer">
{% if image_data or detection_info %}
<div class="content-wrapper">
<img src="data:image/jpeg;base64,{{ image_data }}" alt="Processed Image" class="result-img">
<table>
<thead>
<tr>
<th>#</th>
<th>Class</th>
<th>Confidence</th>
<th>Width (cm)</th>
<th>Height (cm)</th>
</tr>
</thead>
<tbody>
{% for det in detection_info %}
<tr>
<td>{{ loop.index }}</td>
<td>{{ det.class }}</td>
<td>{{ det.confidence }}</td>
<td>{{ det.width_cm }}</td>
<td>{{ det.height_cm }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
</div>
<div class="footer">&copy; 2024 MathLens AI Detection App. All rights reserved.</div>
<script>
// Handle typing effect
const textArray = ["MathLens", "Smart Counting with Maths"];
let textIndex = 0, charIndex = 0, isDeleting = false;
const typingElement = document.getElementById("typing");
function typeEffect() {
let currentText = textArray[textIndex];
if (isDeleting) {
typingElement.textContent = currentText.substring(0, charIndex--);
} else {
typingElement.textContent = currentText.substring(0, charIndex++);
}
if (!isDeleting && charIndex === currentText.length) {
setTimeout(() => { isDeleting = true; typeEffect(); }, 3000);
} else if (isDeleting && charIndex === 0) {
isDeleting = false;
textIndex = (textIndex + 1) % textArray.length;
setTimeout(typeEffect, 500);
} else {
setTimeout(typeEffect, isDeleting ? 50 : 100);
}
}
document.addEventListener("DOMContentLoaded", () => {
setTimeout(typeEffect, 500);
// Detect if the device is mobile and update the thickness multiplier accordingly.
var isMobile = /Mobi|Android/i.test(navigator.userAgent);
document.getElementById("multiplier").value = isMobile ? 2 : 1;
});
// AJAX-based submission and progress polling
const uploadForm = document.getElementById("uploadForm");
uploadForm.addEventListener("submit", function(e) {
e.preventDefault();
const formData = new FormData(uploadForm);
// Show progress overlay
document.getElementById("progressOverlay").style.display = "flex";
// Start the analysis job
fetch("{{ url_for('analyze') }}", {
method: "POST",
body: formData
})
.then(response => response.json())
.then(data => {
const jobId = data.job_id;
// Start polling progress every 500ms
const progressInterval = setInterval(() => {
fetch("{{ url_for('progress') }}?job_id=" + jobId)
.then(response => response.json())
.then(progData => {
const progress = progData.progress;
document.getElementById("progressBar").style.width = progress + "%";
if (progress < 10) {
document.getElementById("progressText").textContent = "Starting up... 🛠️";
} else if (progress < 30) {
document.getElementById("progressText").textContent = "Writing scripts... 🤖";
} else if (progress < 50) {
document.getElementById("progressText").textContent = "Calculating formulas... 🧮";
} else if (progress < 70) {
document.getElementById("progressText").textContent = "Crunching numbers... 🔢";
} else if (progress < 90) {
document.getElementById("progressText").textContent = "Almost there... 🚀";
} else {
document.getElementById("progressText").textContent = "Finalizing... 🏁";
}
if (progress >= 100) {
clearInterval(progressInterval);
fetch("{{ url_for('result') }}?job_id=" + jobId)
.then(response => response.json())
.then(resultData => {
document.getElementById("progressOverlay").style.display = "none";
document.getElementById("resultContainer").innerHTML = `
<div class="content-wrapper">
<img src="data:image/jpeg;base64,${resultData.image_data}" alt="Processed Image" class="result-img">
${buildTableHTML(resultData.detection_info)}
</div>`;
});
}
});
}, 500);
});
});
function buildTableHTML(detectionInfo) {
if (!detectionInfo || detectionInfo.length === 0) return "";
let tableHTML = `<table>
<thead>
<tr>
<th>#</th>
<th>Class</th>
<th>Confidence</th>
<th>Width (cm)</th>
<th>Height (cm)</th>
</tr>
</thead>
<tbody>`;
detectionInfo.forEach((det, index) => {
tableHTML += `<tr>
<td>${index+1}</td>
<td>${det.class}</td>
<td>${det.confidence}</td>
<td>${det.width_cm}</td>
<td>${det.height_cm}</td>
</tr>`;
});
tableHTML += `</tbody></table>`;
return tableHTML;
}
</script>
</body>
</html>
'''
#########################################
# 4. Flask Routes
#########################################
@app.route('/')
def landing():
return render_template_string(landing_template)
@app.route('/upload', methods=['GET'])
def upload():
object_type = request.args.get('object_type', '').lower()
if object_type not in {"person", "car", "box"}:
flash("Please select a valid object type.")
return redirect(url_for('landing'))
return render_template_string(upload_template, object_type=object_type)
@app.route('/analyze', methods=['POST'])
def analyze():
if 'file' not in request.files:
return jsonify({"error": "No file provided."}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file."}), 400
object_type = request.form.get('object_type', '').lower()
if object_type not in {"person", "car", "box"}:
return jsonify({"error": "Invalid object type."}), 400
try:
multiplier = int(request.form.get('multiplier', 1))
except ValueError:
multiplier = 1
upload_path = "uploaded.jpg"
try:
file.save(upload_path)
except Exception as e:
return jsonify({"error": "Error saving file."}), 500
job_id = str(uuid.uuid4())
jobs[job_id] = {"progress": 0, "result": None}
thread = threading.Thread(target=process_image, args=(job_id, upload_path, object_type, multiplier))
thread.start()
return jsonify({"job_id": job_id})
@app.route('/progress', methods=['GET'])
def progress():
job_id = request.args.get('job_id', '')
if job_id not in jobs:
return jsonify({"progress": 0})
return jsonify({"progress": jobs[job_id].get("progress", 0)})
@app.route('/result', methods=['GET'])
def result():
job_id = request.args.get('job_id', '')
if job_id not in jobs or jobs[job_id].get("result") is None:
return jsonify({"error": "Result not available."}), 404
result = jobs[job_id]["result"]
del jobs[job_id]
return jsonify(result)
#########################################
# 5. Run the App
#########################################
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860, threaded=True)