import os import uuid import threading import time import cv2 import numpy as np import base64 from flask import Flask, render_template_string, request, redirect, flash, url_for, jsonify import roboflow import torch from collections import Counter app = Flask(__name__) app.secret_key = 'your_secret_key' # Replace with a secure secret key # Global dictionary to hold job progress and results jobs = {} # jobs[job_id] = {"progress": int, "result": {...}} ######################################### # 1. Initialize the Models ######################################### # --- Roboflow Box Detection Model --- API_KEY = "wLjPoPYaLmrqCIOFA0RH" # Your Roboflow API key PROJECT_ID = "base-model-box-r4suo-8lkk1-6dbqh" # Your Roboflow project ID VERSION_NUMBER = "2" # Your trained model version number try: rf = roboflow.Roboflow(api_key=API_KEY) workspace = rf.workspace() project = workspace.project(PROJECT_ID) version = project.version(VERSION_NUMBER) box_model = version.model # This model is trained for detecting boxes print("Roboflow model loaded successfully.") except Exception as e: print("Error initializing Roboflow model:", e) box_model = None # --- YOLOv5 Pretrained Model for Persons & Cars --- try: yolov5_model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True) print("YOLOv5 model loaded successfully.") except Exception as e: print("Error loading YOLOv5 model:", e) yolov5_model = None ######################################### # 2. Helper Functions ######################################### def compute_iou(boxA, boxB): xA = max(boxA[0], boxB[0]) yA = max(boxA[1], boxB[1]) xB = min(boxA[2], boxB[2]) yB = min(boxA[3], boxB[3]) interWidth = max(0, xB - xA) interHeight = max(0, yB - yA) interArea = interWidth * interHeight boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) if boxAArea + boxBArea - interArea == 0: return 0 return interArea / float(boxAArea + boxBArea - interArea) # Lower the NMS threshold to 0.3 so that adjacent boxes are less likely to be merged. def custom_nms(preds, iou_threshold=0.3): preds = sorted(preds, key=lambda x: x["confidence"], reverse=True) filtered_preds = [] for pred in preds: keep = True for kept in filtered_preds: if compute_iou(pred["box"], kept["box"]) > iou_threshold: keep = False break if keep: filtered_preds.append(pred) return filtered_preds # The process_image function now uses: # - Roboflow prediction parameters: confidence=50 and a lower overlap=10. # - A custom NMS with IoU threshold of 0.3. # - ArUco marker detection for conversion factor computation. def process_image(job_id, image_path, object_type, multiplier): try: jobs[job_id]['progress'] = 10 # Load the original image image = cv2.imread(image_path) if image is None: jobs[job_id]['progress'] = 100 jobs[job_id]['result'] = {"error": "Could not read the image."} return jobs[job_id]['progress'] = 20 img_height, img_width = image.shape[:2] # Set dynamic thickness based on image size and multiplier. thickness = max(2, int(min(img_width, img_height) / 300)) * multiplier detection_info = [] if object_type == "box": if box_model is None: jobs[job_id]['progress'] = 100 jobs[job_id]['result'] = {"error": "Roboflow model not available."} return # --- BOX DETECTION --- # Upscale if the image is small. scale_factor = 1 if img_width < 1000 or img_height < 1000: scale_factor = 2 # Use improved parameters: confidence=50 and overlap=10 (lowered overlap). if scale_factor > 1: upscaled_image = cv2.resize(image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR) temp_path = "upscaled.jpg" cv2.imwrite(temp_path, upscaled_image) results = box_model.predict(temp_path, confidence=50, overlap=10).json() else: results = box_model.predict(image_path, confidence=50, overlap=10).json() predictions = results.get("predictions", []) processed_preds = [] for prediction in predictions: try: if scale_factor > 1: x = prediction["x"] / scale_factor y = prediction["y"] / scale_factor width = prediction["width"] / scale_factor height = prediction["height"] / scale_factor else: x = prediction["x"] y = prediction["y"] width = prediction["width"] height = prediction["height"] # Convert center-based coordinates to corner-based bounding box. x1 = int(round(x - width / 2)) y1 = int(round(y - height / 2)) x2 = int(round(x + width / 2)) y2 = int(round(y + height / 2)) # Clamp coordinates within the image. x1 = max(0, min(x1, img_width - 1)) y1 = max(0, min(y1, img_height - 1)) x2 = max(0, min(x2, img_width - 1)) y2 = max(0, min(y2, img_height - 1)) processed_preds.append({ "box": (x1, y1, x2, y2), "class": prediction["class"], "confidence": prediction["confidence"] }) except Exception as e: continue # Apply custom NMS with an IoU threshold of 0.3. box_detections = custom_nms(processed_preds, iou_threshold=0.3) jobs[job_id]['progress'] = 60 # --- ARUCO MARKER DETECTION & SIZE CONVERSION --- marker_real_width_cm = 5.0 # The printed marker is 5 cm x 5 cm. try: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250) if hasattr(cv2.aruco, 'DetectorParameters_create'): aruco_params = cv2.aruco.DetectorParameters_create() else: aruco_params = cv2.aruco.DetectorParameters() corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=aruco_params) if ids is not None and len(corners) > 0: marker_corners = corners[0].reshape((4, 2)) cv2.aruco.drawDetectedMarkers(image, corners, ids) # Compute the marker's bounding box. min_x = np.min(marker_corners[:, 0]) max_x = np.max(marker_corners[:, 0]) min_y = np.min(marker_corners[:, 1]) max_y = np.max(marker_corners[:, 1]) width_pixels = max_x - min_x height_pixels = max_y - min_y if width_pixels > 0 and height_pixels > 0: # Use the average conversion factor from width and height. conversion_factor = (marker_real_width_cm / width_pixels + marker_real_width_cm / height_pixels) / 2 else: conversion_factor = None else: conversion_factor = None except Exception as e: conversion_factor = None # --- Draw Boxes & Compute Sizes --- for pred in box_detections: x1, y1, x2, y2 = pred["box"] label = pred["class"] confidence = pred["confidence"] cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), thickness) if conversion_factor is not None: box_width_pixels = x2 - x1 box_height_pixels = y2 - y1 box_width_cm = box_width_pixels * conversion_factor box_height_cm = box_height_pixels * conversion_factor detection_info.append({ "class": label, "confidence": f"{confidence:.2f}", "width_cm": f"{box_width_cm:.1f}", "height_cm": f"{box_height_cm:.1f}" }) else: detection_info.append({ "class": label, "confidence": f"{confidence:.2f}", "width_cm": "N/A", "height_cm": "N/A" }) text = f"{label} ({confidence:.2f})" (text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(image, (x1, y1 - text_height - baseline - 5), (x1 + text_width, y1 - 5), (0, 255, 0), -1) cv2.putText(image, text, (x1, y1 - 5 - baseline), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) elif object_type in {"person", "car"}: if yolov5_model is None: jobs[job_id]['progress'] = 100 jobs[job_id]['result'] = {"error": "YOLOv5 model not available."} return try: img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) yolo_results = yolov5_model(img_rgb) df = yolo_results.pandas().xyxy[0] for _, row in df.iterrows(): if row['name'] == object_type: xmin = int(row['xmin']) ymin = int(row['ymin']) xmax = int(row['xmax']) ymax = int(row['ymax']) conf = row['confidence'] label = row['name'] cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (255, 0, 0), thickness) text = f"{label} ({conf:.2f})" (text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(image, (xmin, ymin - text_height - baseline - 5), (xmin + text_width, ymin - 5), (255, 0, 0), -1) cv2.putText(image, text, (xmin, ymin - 5 - baseline), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) detection_info.append({ "class": label, "confidence": f"{conf:.2f}", "width_cm": "N/A", "height_cm": "N/A" }) except Exception as e: jobs[job_id]['progress'] = 100 jobs[job_id]['result'] = {"error": "Error during YOLOv5 inference."} return # Draw summary text on the image detection_counts = Counter(det["class"] for det in detection_info) if detection_counts: top_text = ", ".join(f"{cls}: {count}" for cls, count in detection_counts.items()) (info_width, info_height), info_baseline = cv2.getTextSize(top_text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2) cv2.rectangle(image, (5, 5), (5 + info_width, 5 + info_height + info_baseline), (0, 255, 0), -1) cv2.putText(image, top_text, (5, 5 + info_height), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) jobs[job_id]['progress'] = 100 retval, buffer = cv2.imencode('.jpg', image) image_data = base64.b64encode(buffer).decode('utf-8') jobs[job_id]['result'] = {"image_data": image_data, "detection_info": detection_info} except Exception as e: jobs[job_id]['progress'] = 100 jobs[job_id]['result'] = {"error": "Unexpected error during processing."} ######################################### # 3. HTML Templates ######################################### landing_template = ''' MathLens

MathLens

What do you want to count?

People Cars Boxes
''' upload_template = ''' MathLens - AI Detection & Measurement Home
Starting up... 🛠️
{% if image_data or detection_info %}
Processed Image {% for det in detection_info %} {% endfor %}
# Class Confidence Width (cm) Height (cm)
{{ loop.index }} {{ det.class }} {{ det.confidence }} {{ det.width_cm }} {{ det.height_cm }}
{% endif %}
''' ######################################### # 4 Flask Routes ######################################### @app.route('/') def landing(): return render_template_string(landing_template) @app.route('/upload', methods=['GET']) def upload(): object_type = request.args.get('object_type', '').lower() if object_type not in {"person", "car", "box"}: flash("Please select a valid object type.") return redirect(url_for('landing')) return render_template_string(upload_template, object_type=object_type) @app.route('/analyze', methods=['POST']) def analyze(): if 'file' not in request.files: return jsonify({"error": "No file provided."}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file."}), 400 object_type = request.form.get('object_type', '').lower() if object_type not in {"person", "car", "box"}: return jsonify({"error": "Invalid object type."}), 400 try: multiplier = int(request.form.get('multiplier', 1)) except ValueError: multiplier = 1 upload_path = "uploaded.jpg" try: file.save(upload_path) except Exception as e: return jsonify({"error": "Error saving file."}), 500 job_id = str(uuid.uuid4()) jobs[job_id] = {"progress": 0, "result": None} thread = threading.Thread(target=process_image, args=(job_id, upload_path, object_type, multiplier)) thread.start() return jsonify({"job_id": job_id}) @app.route('/progress', methods=['GET']) def progress(): job_id = request.args.get('job_id', '') if job_id not in jobs: return jsonify({"progress": 0}) return jsonify({"progress": jobs[job_id].get("progress", 0)}) @app.route('/result', methods=['GET']) def result(): job_id = request.args.get('job_id', '') if job_id not in jobs or jobs[job_id].get("result") is None: return jsonify({"error": "Result not available."}), 404 result = jobs[job_id]["result"] del jobs[job_id] return jsonify(result) ######################################### # 5. Run the App ######################################### if __name__ == '__main__': app.run(host="0.0.0.0", port=7860, threaded=True)