# processor.py import cv2 import numpy as np import os from ultralytics import YOLO from transformers import AutoProcessor, AutoModelForCausalLM, pipeline from PIL import Image, ImageDraw, ImageFont import re import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders import torch from dotenv import load_dotenv from transformers import AutoProcessor, AutoModel # Load environment variables load_dotenv() # Email credentials (Use environment variables for security) FROM_EMAIL = os.getenv("FROM_EMAIL") EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD") TO_EMAIL = os.getenv("TO_EMAIL") SMTP_SERVER = 'smtp.gmail.com' SMTP_PORT = 465 # Arabic dictionary for converting license plate text arabic_dict = { "0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥", "6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب", "J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط", "E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن", "H": "ه", "U": "و", "V": "ي", " ": " " } # Define class colors class_colors = { 0: (0, 255, 0), # Green (Helmet) 1: (255, 0, 0), # Blue (License Plate) 2: (0, 0, 255), # Red (MotorbikeDelivery) 3: (255, 255, 0), # Cyan (MotorbikeSport) 4: (255, 0, 255), # Magenta (No Helmet) 5: (0, 255, 255), # Yellow (Person) } # Load the OCR model processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True) model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda') # Load YOLO model # Ensure the path to the model is correct model = YOLO('yolov8_Medium.pt') # Update the path as needed # Define lane area coordinates (example coordinates) red_lane = np.array([[2,1583],[1,1131],[1828,1141],[1912,1580]], np.int32) # Dictionary to track violations per license plate violations_dict = {} def filter_license_plate_text(license_plate_text): """Filter and format the license plate text.""" license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text) match = re.search(r'(\d{4})\s*([A-Z]{2})', license_plate_text) return f"{match.group(1)} {match.group(2)}" if match else None def convert_to_arabic(license_plate_text): """Convert license plate text from Latin to Arabic script.""" return "".join(arabic_dict.get(char, char) for char in license_plate_text) def send_email(license_text, violation_image_path, violation_type): """Send an email notification with violation details and image attachment.""" # Define the subject and body based on violation type subjects = { 'No Helmet, In Red Lane': 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر', 'In Red Lane': 'تنبيه مخالفة: دخول المسار الأيسر', 'No Helmet': 'تنبيه مخالفة: عدم ارتداء خوذة' } bodies = { 'No Helmet, In Red Lane': f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة", 'In Red Lane': f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة", 'No Helmet': f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة" } subject = subjects.get(violation_type, 'تنبيه مخالفة') body = bodies.get(violation_type, f"تم تغريم دراجة نارية التي تحمل لوحة ({license_text}) بسبب مخالفة.") # Create the email message msg = MIMEMultipart() msg['From'] = FROM_EMAIL msg['To'] = TO_EMAIL msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) # Attach the violation image if os.path.exists(violation_image_path): with open(violation_image_path, 'rb') as attachment_file: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment_file.read()) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}') msg.attach(part) # Send the email using SMTP try: with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server: server.login(FROM_EMAIL, EMAIL_PASSWORD) server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string()) print("Email with attachment sent successfully!") except Exception as e: print(f"Failed to send email: {e}") def draw_text_pil(img, text, position, font_path, font_size, color): """Draw text on an image using PIL for better font support.""" img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(img_pil) try: font = ImageFont.truetype(font_path, size=font_size) except IOError: print(f"Font file not found at {font_path}. Using default font.") font = ImageFont.load_default() draw.text(position, text, font=font, fill=color) return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) def process_frame(frame, font_path, violation_image_path='violation.jpg'): """Process a single video frame for violations.""" results = model.track(frame) for box in results[0].boxes: x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) label = model.names[int(box.cls)] color = class_colors.get(int(box.cls), (255, 255, 255)) confidence = box.conf[0].item() # Draw bounding box and label cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3) cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) if label == 'MotorbikeDelivery' and confidence >= 0.4: motorbike_crop = frame[max(0, y1 - 50):y2, x1:x2] delivery_center = ((x1 + x2) // 2, y2) in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False) violation_types = [] if in_red_lane >= 0: violation_types.append("In Red Lane") # Detect sub-objects within the motorbike crop sub_results = model(motorbike_crop) for sub_box in sub_results[0].boxes: sub_x1, sub_y1, sub_x2, sub_y2 = map(int, sub_box.xyxy[0].cpu().numpy()) sub_label = model.names[int(sub_box.cls)] if sub_label == 'No_Helmet': violation_types.append("No Helmet") elif sub_label == 'License_plate': license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2] if violation_types: # Save violation image cv2.imwrite(violation_image_path, frame) license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB)) license_plate_pil.save('license_plate.png') # Perform OCR try: ocr_result = ocr_pipeline(Image.open('license_plate.png')) license_plate_text = ocr_result[0]['generated_text'] if ocr_result else "" except Exception as e: print(f"OCR failed: {e}") license_plate_text = "" filtered_text = filter_license_plate_text(license_plate_text) if filtered_text: if filtered_text not in violations_dict: violations_dict[filtered_text] = violation_types send_email(filtered_text, violation_image_path, ', '.join(violation_types)) else: current = set(violations_dict[filtered_text]) new = set(violation_types) updated = current | new if updated != current: violations_dict[filtered_text] = list(updated) send_email(filtered_text, violation_image_path, ', '.join(updated)) arabic_text = convert_to_arabic(filtered_text) frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, 30, (255, 255, 255)) frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, 30, (0, 255, 0)) return frame def process_image(image_path, font_path, violation_image_path='violation.jpg'): """Process an uploaded image and return the processed image.""" frame = cv2.imread(image_path) if frame is None: print("Error loading image") return None processed = process_frame(frame, font_path, violation_image_path) return processed def process_video(video_path, font_path, violation_image_path='violation.jpg'): """Process a video file and save the processed video.""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print("Error opening video file") return None fourcc = cv2.VideoWriter_fourcc(*'mp4v') output_video_path = 'output_violation.mp4' fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) while cap.isOpened(): ret, frame = cap.read() if not ret: break # Optionally draw red lane cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3) processed_frame = process_frame(frame, font_path, violation_image_path) out.write(processed_frame) cap.release() out.release() return output_video_path