|
import os |
|
import uuid |
|
import threading |
|
import time |
|
|
|
import cv2 |
|
import numpy as np |
|
import base64 |
|
from flask import Flask, render_template_string, request, redirect, flash, url_for, jsonify |
|
import roboflow |
|
import torch |
|
from collections import Counter |
|
|
|
app = Flask(__name__) |
|
app.secret_key = 'your_secret_key' |
|
|
|
|
|
jobs = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
API_KEY = "wLjPoPYaLmrqCIOFA0RH" |
|
PROJECT_ID = "base-model-box-r4suo-8lkk1-6dbqh" |
|
VERSION_NUMBER = "2" |
|
|
|
try: |
|
rf = roboflow.Roboflow(api_key=API_KEY) |
|
workspace = rf.workspace() |
|
project = workspace.project(PROJECT_ID) |
|
version = project.version(VERSION_NUMBER) |
|
box_model = version.model |
|
print("Roboflow model loaded successfully.") |
|
except Exception as e: |
|
print("Error initializing Roboflow model:", e) |
|
box_model = None |
|
|
|
|
|
try: |
|
yolov5_model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True) |
|
print("YOLOv5 model loaded successfully.") |
|
except Exception as e: |
|
print("Error loading YOLOv5 model:", e) |
|
yolov5_model = None |
|
|
|
|
|
|
|
|
|
|
|
def compute_iou(boxA, boxB): |
|
xA = max(boxA[0], boxB[0]) |
|
yA = max(boxA[1], boxB[1]) |
|
xB = min(boxA[2], boxB[2]) |
|
yB = min(boxA[3], boxB[3]) |
|
interWidth = max(0, xB - xA) |
|
interHeight = max(0, yB - yA) |
|
interArea = interWidth * interHeight |
|
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) |
|
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) |
|
if boxAArea + boxBArea - interArea == 0: |
|
return 0 |
|
return interArea / float(boxAArea + boxBArea - interArea) |
|
|
|
|
|
def custom_nms(preds, iou_threshold=0.3): |
|
preds = sorted(preds, key=lambda x: x["confidence"], reverse=True) |
|
filtered_preds = [] |
|
for pred in preds: |
|
keep = True |
|
for kept in filtered_preds: |
|
if compute_iou(pred["box"], kept["box"]) > iou_threshold: |
|
keep = False |
|
break |
|
if keep: |
|
filtered_preds.append(pred) |
|
return filtered_preds |
|
|
|
|
|
|
|
|
|
|
|
def process_image(job_id, image_path, object_type, multiplier): |
|
try: |
|
jobs[job_id]['progress'] = 10 |
|
|
|
image = cv2.imread(image_path) |
|
if image is None: |
|
jobs[job_id]['progress'] = 100 |
|
jobs[job_id]['result'] = {"error": "Could not read the image."} |
|
return |
|
|
|
jobs[job_id]['progress'] = 20 |
|
img_height, img_width = image.shape[:2] |
|
|
|
thickness = max(2, int(min(img_width, img_height) / 300)) * multiplier |
|
detection_info = [] |
|
|
|
if object_type == "box": |
|
if box_model is None: |
|
jobs[job_id]['progress'] = 100 |
|
jobs[job_id]['result'] = {"error": "Roboflow model not available."} |
|
return |
|
|
|
|
|
|
|
scale_factor = 1 |
|
if img_width < 1000 or img_height < 1000: |
|
scale_factor = 2 |
|
|
|
|
|
if scale_factor > 1: |
|
upscaled_image = cv2.resize(image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR) |
|
temp_path = "upscaled.jpg" |
|
cv2.imwrite(temp_path, upscaled_image) |
|
results = box_model.predict(temp_path, confidence=50, overlap=10).json() |
|
else: |
|
results = box_model.predict(image_path, confidence=50, overlap=10).json() |
|
|
|
predictions = results.get("predictions", []) |
|
processed_preds = [] |
|
for prediction in predictions: |
|
try: |
|
if scale_factor > 1: |
|
x = prediction["x"] / scale_factor |
|
y = prediction["y"] / scale_factor |
|
width = prediction["width"] / scale_factor |
|
height = prediction["height"] / scale_factor |
|
else: |
|
x = prediction["x"] |
|
y = prediction["y"] |
|
width = prediction["width"] |
|
height = prediction["height"] |
|
|
|
|
|
x1 = int(round(x - width / 2)) |
|
y1 = int(round(y - height / 2)) |
|
x2 = int(round(x + width / 2)) |
|
y2 = int(round(y + height / 2)) |
|
|
|
x1 = max(0, min(x1, img_width - 1)) |
|
y1 = max(0, min(y1, img_height - 1)) |
|
x2 = max(0, min(x2, img_width - 1)) |
|
y2 = max(0, min(y2, img_height - 1)) |
|
processed_preds.append({ |
|
"box": (x1, y1, x2, y2), |
|
"class": prediction["class"], |
|
"confidence": prediction["confidence"] |
|
}) |
|
except Exception as e: |
|
continue |
|
|
|
|
|
box_detections = custom_nms(processed_preds, iou_threshold=0.3) |
|
jobs[job_id]['progress'] = 60 |
|
|
|
|
|
marker_real_width_cm = 5.0 |
|
try: |
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250) |
|
if hasattr(cv2.aruco, 'DetectorParameters_create'): |
|
aruco_params = cv2.aruco.DetectorParameters_create() |
|
else: |
|
aruco_params = cv2.aruco.DetectorParameters() |
|
corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=aruco_params) |
|
if ids is not None and len(corners) > 0: |
|
marker_corners = corners[0].reshape((4, 2)) |
|
cv2.aruco.drawDetectedMarkers(image, corners, ids) |
|
|
|
min_x = np.min(marker_corners[:, 0]) |
|
max_x = np.max(marker_corners[:, 0]) |
|
min_y = np.min(marker_corners[:, 1]) |
|
max_y = np.max(marker_corners[:, 1]) |
|
width_pixels = max_x - min_x |
|
height_pixels = max_y - min_y |
|
if width_pixels > 0 and height_pixels > 0: |
|
|
|
conversion_factor = (marker_real_width_cm / width_pixels + marker_real_width_cm / height_pixels) / 2 |
|
else: |
|
conversion_factor = None |
|
else: |
|
conversion_factor = None |
|
except Exception as e: |
|
conversion_factor = None |
|
|
|
|
|
for pred in box_detections: |
|
x1, y1, x2, y2 = pred["box"] |
|
label = pred["class"] |
|
confidence = pred["confidence"] |
|
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), thickness) |
|
if conversion_factor is not None: |
|
box_width_pixels = x2 - x1 |
|
box_height_pixels = y2 - y1 |
|
box_width_cm = box_width_pixels * conversion_factor |
|
box_height_cm = box_height_pixels * conversion_factor |
|
detection_info.append({ |
|
"class": label, |
|
"confidence": f"{confidence:.2f}", |
|
"width_cm": f"{box_width_cm:.1f}", |
|
"height_cm": f"{box_height_cm:.1f}" |
|
}) |
|
else: |
|
detection_info.append({ |
|
"class": label, |
|
"confidence": f"{confidence:.2f}", |
|
"width_cm": "N/A", |
|
"height_cm": "N/A" |
|
}) |
|
text = f"{label} ({confidence:.2f})" |
|
(text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) |
|
cv2.rectangle(image, (x1, y1 - text_height - baseline - 5), (x1 + text_width, y1 - 5), (0, 255, 0), -1) |
|
cv2.putText(image, text, (x1, y1 - 5 - baseline), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) |
|
|
|
elif object_type in {"person", "car"}: |
|
if yolov5_model is None: |
|
jobs[job_id]['progress'] = 100 |
|
jobs[job_id]['result'] = {"error": "YOLOv5 model not available."} |
|
return |
|
try: |
|
img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
yolo_results = yolov5_model(img_rgb) |
|
df = yolo_results.pandas().xyxy[0] |
|
for _, row in df.iterrows(): |
|
if row['name'] == object_type: |
|
xmin = int(row['xmin']) |
|
ymin = int(row['ymin']) |
|
xmax = int(row['xmax']) |
|
ymax = int(row['ymax']) |
|
conf = row['confidence'] |
|
label = row['name'] |
|
cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (255, 0, 0), thickness) |
|
text = f"{label} ({conf:.2f})" |
|
(text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) |
|
cv2.rectangle(image, (xmin, ymin - text_height - baseline - 5), (xmin + text_width, ymin - 5), (255, 0, 0), -1) |
|
cv2.putText(image, text, (xmin, ymin - 5 - baseline), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) |
|
detection_info.append({ |
|
"class": label, |
|
"confidence": f"{conf:.2f}", |
|
"width_cm": "N/A", |
|
"height_cm": "N/A" |
|
}) |
|
except Exception as e: |
|
jobs[job_id]['progress'] = 100 |
|
jobs[job_id]['result'] = {"error": "Error during YOLOv5 inference."} |
|
return |
|
|
|
|
|
detection_counts = Counter(det["class"] for det in detection_info) |
|
if detection_counts: |
|
top_text = ", ".join(f"{cls}: {count}" for cls, count in detection_counts.items()) |
|
(info_width, info_height), info_baseline = cv2.getTextSize(top_text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2) |
|
cv2.rectangle(image, (5, 5), (5 + info_width, 5 + info_height + info_baseline), (0, 255, 0), -1) |
|
cv2.putText(image, top_text, (5, 5 + info_height), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
|
jobs[job_id]['progress'] = 100 |
|
retval, buffer = cv2.imencode('.jpg', image) |
|
image_data = base64.b64encode(buffer).decode('utf-8') |
|
jobs[job_id]['result'] = {"image_data": image_data, "detection_info": detection_info} |
|
except Exception as e: |
|
jobs[job_id]['progress'] = 100 |
|
jobs[job_id]['result'] = {"error": "Unexpected error during processing."} |
|
|
|
|
|
|
|
|
|
|
|
landing_template = ''' |
|
<!DOCTYPE html> |
|
<html lang="en"> |
|
<head> |
|
<meta charset="UTF-8"> |
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"> |
|
<title>MathLens</title> |
|
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css"> |
|
<style> |
|
@import url('https://fonts.googleapis.com/css2?family=Share+Tech+Mono&display=swap'); |
|
body { background-color: #fff; color: #000; font-family: "Share Tech Mono", monospace; |
|
text-align: center; display: flex; flex-direction: column; justify-content: center; |
|
align-items: center; min-height: 100vh; padding: 20px; } |
|
h1 { font-size: 2.5rem; margin-bottom: 20px; } |
|
p { font-size: 1.5rem; margin-bottom: 40px; } |
|
.btn { display: inline-block; margin: 10px; padding: 15px 30px; |
|
font-size: 1.2rem; text-decoration: none; border: 2px solid #000; |
|
color: #000; transition: background-color 0.3s, color 0.3s; } |
|
.btn:hover { background-color: #000; color: #fff; } |
|
</style> |
|
</head> |
|
<body> |
|
<h1>MathLens</h1> |
|
<p>What do you want to count?</p> |
|
<div> |
|
<a href="{{ url_for('upload') }}?object_type=person" class="btn">People</a> |
|
<a href="{{ url_for('upload') }}?object_type=car" class="btn">Cars</a> |
|
<a href="{{ url_for('upload') }}?object_type=box" class="btn">Boxes</a> |
|
</div> |
|
</body> |
|
</html> |
|
''' |
|
|
|
upload_template = ''' |
|
<!DOCTYPE html> |
|
<html lang="en"> |
|
<head> |
|
<meta charset="UTF-8"> |
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"> |
|
<title>MathLens - AI Detection & Measurement</title> |
|
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css"> |
|
<style> |
|
@import url('https://fonts.googleapis.com/css2?family=Share+Tech+Mono&display=swap'); |
|
body { background-color: #fff; color: #000; font-family: "Share Tech Mono", monospace; |
|
text-align: center; display: flex; flex-direction: column; justify-content: center; |
|
align-items: center; min-height: 100vh; padding: 20px; } |
|
.typing-effect { font-size: 2rem; font-weight: bold; margin-bottom: 20px; |
|
height: 50px; white-space: nowrap; } |
|
form { margin-bottom: 20px; } |
|
input[type="file"], button { display: block; margin: 10px auto; padding: 10px; |
|
background: none; border: 2px solid #000; color: #000; |
|
font-size: 1rem; font-family: "Share Tech Mono", monospace; |
|
cursor: pointer; } |
|
input[type="file"]::file-selector-button { background: none; border: none; color: #000; } |
|
.home-btn { display: inline-block; margin: 10px auto 20px; padding: 10px 20px; |
|
border: 2px solid #000; color: #000; text-decoration: none; |
|
font-family: "Share Tech Mono", monospace; transition: background-color 0.3s, color 0.3s; } |
|
.home-btn:hover { background-color: #000; color: #fff; } |
|
/* Progress overlay styles */ |
|
#progressOverlay { position: fixed; top: 0; left: 0; width: 100%; height: 100%; |
|
background: rgba(255,255,255,0.9); display: none; |
|
align-items: center; justify-content: center; flex-direction: column; |
|
z-index: 9999; } |
|
#progressContainer { width: 80%; max-width: 400px; } |
|
#progressBar { height: 20px; width: 0; background-color: #000; border-radius: 10px; |
|
transition: width 0.2s linear; } |
|
#progressText { margin-top: 10px; font-size: 1.2rem; } |
|
.content-wrapper { display: flex; flex-direction: row; align-items: center; |
|
justify-content: space-evenly; width: 100%; max-width: 1200px; |
|
flex-wrap: wrap; gap: 20px; } |
|
.result-img { max-width: 100%; border: 2px solid #000; } |
|
table { width: 100%; max-width: 600px; border-collapse: collapse; } |
|
th, td { border: 1px solid #000; padding: 5px; text-align: center; } |
|
.footer { margin-top: 20px; font-size: 0.9em; color: #000; } |
|
@media (max-width: 768px) { |
|
.typing-effect { font-size: 1.5rem; } |
|
.content-wrapper { flex-direction: column; align-items: center; } |
|
.result-img { max-width: 90%; } |
|
table { max-width: 100%; } |
|
} |
|
</style> |
|
</head> |
|
<body> |
|
<!-- Home button --> |
|
<a href="{{ url_for('landing') }}" class="home-btn">Home</a> |
|
<!-- Progress overlay --> |
|
<div id="progressOverlay"> |
|
<div id="progressContainer"> |
|
<div id="progressBar"></div> |
|
<div id="progressText">Starting up... ๐ ๏ธ</div> |
|
</div> |
|
</div> |
|
<div class="typing-effect" id="typing"></div> |
|
<!-- The file upload form (submission handled via AJAX) --> |
|
<form id="uploadForm"> |
|
<input type="file" name="file" accept="image/*" required> |
|
<!-- Hidden fields to pass the selected object type and device multiplier --> |
|
<input type="hidden" name="object_type" value="{{ object_type }}"> |
|
<input type="hidden" name="multiplier" id="multiplier" value="1"> |
|
<button type="submit">Analyze Image</button> |
|
</form> |
|
<div id="resultContainer"> |
|
{% if image_data or detection_info %} |
|
<div class="content-wrapper"> |
|
<img src="data:image/jpeg;base64,{{ image_data }}" alt="Processed Image" class="result-img"> |
|
<table> |
|
<thead> |
|
<tr> |
|
<th>#</th> |
|
<th>Class</th> |
|
<th>Confidence</th> |
|
<th>Width (cm)</th> |
|
<th>Height (cm)</th> |
|
</tr> |
|
</thead> |
|
<tbody> |
|
{% for det in detection_info %} |
|
<tr> |
|
<td>{{ loop.index }}</td> |
|
<td>{{ det.class }}</td> |
|
<td>{{ det.confidence }}</td> |
|
<td>{{ det.width_cm }}</td> |
|
<td>{{ det.height_cm }}</td> |
|
</tr> |
|
{% endfor %} |
|
</tbody> |
|
</table> |
|
</div> |
|
{% endif %} |
|
</div> |
|
<div class="footer">© 2024 MathLens AI Detection App. All rights reserved.</div> |
|
<script> |
|
// Handle typing effect |
|
const textArray = ["MathLens", "Smart Counting with Maths"]; |
|
let textIndex = 0, charIndex = 0, isDeleting = false; |
|
const typingElement = document.getElementById("typing"); |
|
function typeEffect() { |
|
let currentText = textArray[textIndex]; |
|
if (isDeleting) { |
|
typingElement.textContent = currentText.substring(0, charIndex--); |
|
} else { |
|
typingElement.textContent = currentText.substring(0, charIndex++); |
|
} |
|
if (!isDeleting && charIndex === currentText.length) { |
|
setTimeout(() => { isDeleting = true; typeEffect(); }, 3000); |
|
} else if (isDeleting && charIndex === 0) { |
|
isDeleting = false; |
|
textIndex = (textIndex + 1) % textArray.length; |
|
setTimeout(typeEffect, 500); |
|
} else { |
|
setTimeout(typeEffect, isDeleting ? 50 : 100); |
|
} |
|
} |
|
document.addEventListener("DOMContentLoaded", () => { |
|
setTimeout(typeEffect, 500); |
|
// Detect if the device is mobile and update the thickness multiplier accordingly. |
|
var isMobile = /Mobi|Android/i.test(navigator.userAgent); |
|
document.getElementById("multiplier").value = isMobile ? 2 : 1; |
|
}); |
|
// AJAX-based submission and progress polling |
|
const uploadForm = document.getElementById("uploadForm"); |
|
uploadForm.addEventListener("submit", function(e) { |
|
e.preventDefault(); |
|
const formData = new FormData(uploadForm); |
|
// Show progress overlay |
|
document.getElementById("progressOverlay").style.display = "flex"; |
|
// Start the analysis job |
|
fetch("{{ url_for('analyze') }}", { |
|
method: "POST", |
|
body: formData |
|
}) |
|
.then(response => response.json()) |
|
.then(data => { |
|
const jobId = data.job_id; |
|
// Start polling progress every 500ms |
|
const progressInterval = setInterval(() => { |
|
fetch("{{ url_for('progress') }}?job_id=" + jobId) |
|
.then(response => response.json()) |
|
.then(progData => { |
|
const progress = progData.progress; |
|
document.getElementById("progressBar").style.width = progress + "%"; |
|
if (progress < 10) { |
|
document.getElementById("progressText").textContent = "Starting up... ๐ ๏ธ"; |
|
} else if (progress < 30) { |
|
document.getElementById("progressText").textContent = "Writing scripts... ๐ค"; |
|
} else if (progress < 50) { |
|
document.getElementById("progressText").textContent = "Calculating formulas... ๐งฎ"; |
|
} else if (progress < 70) { |
|
document.getElementById("progressText").textContent = "Crunching numbers... ๐ข"; |
|
} else if (progress < 90) { |
|
document.getElementById("progressText").textContent = "Almost there... ๐"; |
|
} else { |
|
document.getElementById("progressText").textContent = "Finalizing... ๐"; |
|
} |
|
if (progress >= 100) { |
|
clearInterval(progressInterval); |
|
fetch("{{ url_for('result') }}?job_id=" + jobId) |
|
.then(response => response.json()) |
|
.then(resultData => { |
|
document.getElementById("progressOverlay").style.display = "none"; |
|
document.getElementById("resultContainer").innerHTML = ` |
|
<div class="content-wrapper"> |
|
<img src="data:image/jpeg;base64,${resultData.image_data}" alt="Processed Image" class="result-img"> |
|
${buildTableHTML(resultData.detection_info)} |
|
</div>`; |
|
}); |
|
} |
|
}); |
|
}, 500); |
|
}); |
|
}); |
|
function buildTableHTML(detectionInfo) { |
|
if (!detectionInfo || detectionInfo.length === 0) return ""; |
|
let tableHTML = `<table> |
|
<thead> |
|
<tr> |
|
<th>#</th> |
|
<th>Class</th> |
|
<th>Confidence</th> |
|
<th>Width (cm)</th> |
|
<th>Height (cm)</th> |
|
</tr> |
|
</thead> |
|
<tbody>`; |
|
detectionInfo.forEach((det, index) => { |
|
tableHTML += `<tr> |
|
<td>${index+1}</td> |
|
<td>${det.class}</td> |
|
<td>${det.confidence}</td> |
|
<td>${det.width_cm}</td> |
|
<td>${det.height_cm}</td> |
|
</tr>`; |
|
}); |
|
tableHTML += `</tbody></table>`; |
|
return tableHTML; |
|
} |
|
</script> |
|
</body> |
|
</html> |
|
''' |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/') |
|
def landing(): |
|
return render_template_string(landing_template) |
|
|
|
@app.route('/upload', methods=['GET']) |
|
def upload(): |
|
object_type = request.args.get('object_type', '').lower() |
|
if object_type not in {"person", "car", "box"}: |
|
flash("Please select a valid object type.") |
|
return redirect(url_for('landing')) |
|
return render_template_string(upload_template, object_type=object_type) |
|
|
|
@app.route('/analyze', methods=['POST']) |
|
def analyze(): |
|
if 'file' not in request.files: |
|
return jsonify({"error": "No file provided."}), 400 |
|
file = request.files['file'] |
|
if file.filename == '': |
|
return jsonify({"error": "No selected file."}), 400 |
|
object_type = request.form.get('object_type', '').lower() |
|
if object_type not in {"person", "car", "box"}: |
|
return jsonify({"error": "Invalid object type."}), 400 |
|
try: |
|
multiplier = int(request.form.get('multiplier', 1)) |
|
except ValueError: |
|
multiplier = 1 |
|
upload_path = "uploaded.jpg" |
|
try: |
|
file.save(upload_path) |
|
except Exception as e: |
|
return jsonify({"error": "Error saving file."}), 500 |
|
|
|
job_id = str(uuid.uuid4()) |
|
jobs[job_id] = {"progress": 0, "result": None} |
|
thread = threading.Thread(target=process_image, args=(job_id, upload_path, object_type, multiplier)) |
|
thread.start() |
|
return jsonify({"job_id": job_id}) |
|
|
|
@app.route('/progress', methods=['GET']) |
|
def progress(): |
|
job_id = request.args.get('job_id', '') |
|
if job_id not in jobs: |
|
return jsonify({"progress": 0}) |
|
return jsonify({"progress": jobs[job_id].get("progress", 0)}) |
|
|
|
@app.route('/result', methods=['GET']) |
|
def result(): |
|
job_id = request.args.get('job_id', '') |
|
if job_id not in jobs or jobs[job_id].get("result") is None: |
|
return jsonify({"error": "Result not available."}), 404 |
|
result = jobs[job_id]["result"] |
|
del jobs[job_id] |
|
return jsonify(result) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
app.run(host="0.0.0.0", port=7860, threaded=True) |
|
|