import streamlit as st import cv2 import numpy as np import tempfile import time from ultralytics import YOLO from huggingface_hub import hf_hub_download from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders import os import smtplib from transformers import AutoModel, AutoProcessor from PIL import Image, ImageDraw, ImageFont import re import torch # Email credentials FROM_EMAIL = "Fares5675@gmail.com" EMAIL_PASSWORD = "cawxqifzqiwjufde" # App-Specific Password TO_EMAIL = "Fares5675@gmail.com" SMTP_SERVER = 'smtp.gmail.com' SMTP_PORT = 465 # Arabic dictionary for converting license plate text arabic_dict = { "0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥", "6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب", "J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط", "E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن", "H": "ه", "U": "و", "V": "ي", " ": " " } # Color mapping for different classes class_colors = { 0: (0, 255, 0), # Green (Helmet) 1: (255, 0, 0), # Blue (License Plate) 2: (0, 0, 255), # Red (MotorbikeDelivery) 3: (255, 255, 0), # Cyan (MotorbikeSport) 4: (255, 0, 255), # Magenta (No Helmet) 5: (0, 255, 255), # Yellow (Person) } # Load the OCR model processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True) model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda') # YOLO inference function def run_yolo(image): results = model(image) return results # Function to process YOLO results and draw bounding boxes def process_results(results, image): boxes = results[0].boxes for box in boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = box.conf[0] cls = int(box.cls[0]) label = model.names[cls] color = class_colors.get(cls, (255, 255, 255)) # Draw rectangle and label cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) cv2.putText(image, f"{label} {conf:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) return image # Process uploaded images def process_image(uploaded_file): image = np.array(cv2.imdecode(np.frombuffer(uploaded_file.read(), np.uint8), 1)) results = run_yolo(image) processed_image = process_results(results, image) processed_image_rgb = cv2.cvtColor(processed_image, cv2.COLOR_BGR2RGB) st.image(processed_image_rgb, caption='Detected Image', use_column_width=True) # Process and save uploaded videos @st.cache_data def process_video_and_save(uploaded_file): with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file: temp_file.write(uploaded_file.read()) temp_file_path = temp_file.name video = cv2.VideoCapture(temp_file_path) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) frames = [] current_frame = 0 start_time = time.time() progress_bar = st.progress(0) progress_text = st.empty() while True: ret, frame = video.read() if not ret: break results = run_yolo(frame) processed_frame = process_results(results, frame) processed_frame_rgb = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB) frames.append(processed_frame_rgb) current_frame += 1 progress_percentage = int((current_frame / total_frames) * 100) progress_bar.progress(progress_percentage) progress_text.text(f"Processing frame {current_frame}/{total_frames} ({progress_percentage}%)") video.release() output_path = 'processed_video.mp4' height, width, _ = frames[0].shape out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), 30, (width, height)) for frame in frames: frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) out.write(frame_bgr) out.release() return output_path # Live video feed processing def live_video_feed(): stframe = st.empty() video = cv2.VideoCapture(0) start_time = time.time() while True: ret, frame = video.read() if not ret: break results = run_yolo(frame) processed_frame = process_results(results, frame) processed_frame_rgb = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB) stframe.image(processed_frame_rgb, channels="RGB", use_column_width=True) elapsed_time = time.time() - start_time st.write(f"Elapsed Time: {elapsed_time:.2f} seconds") if st.button("Stop"): break video.release() st.stop() # Function to filter license plate text def filter_license_plate_text(license_plate_text): license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text) match = re.search(r'(\d{3,4})\s*([A-Z]{2})', license_plate_text) return f"{match.group(1)} {match.group(2)}" if match else None # Function to convert license plate text to Arabic def convert_to_arabic(license_plate_text): return "".join(arabic_dict.get(char, char) for char in license_plate_text) # Function to send email notification with image attachment def send_email(license_text, violation_image_path, violation_type): if violation_type == 'no_helmet': subject = 'تنبيه مخالفة: عدم ارتداء خوذة' body = f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." elif violation_type == 'in_red_lane': subject = 'تنبيه مخالفة: دخول المسار الأيسر' body = f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." elif violation_type == 'no_helmet_in_red_lane': subject = 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر' body = f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." msg = MIMEMultipart() msg['From'] = FROM_EMAIL msg['To'] = TO_EMAIL msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) if os.path.exists(violation_image_path): with open(violation_image_path, 'rb') as attachment_file: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment_file.read()) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}') msg.attach(part) with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server: server.login(FROM_EMAIL, EMAIL_PASSWORD) server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string()) print("Email with attachment sent successfully!") # Streamlit app main function def main(): model_file = hf_hub_download(repo_id="TheKnight115/Yolov8m", filename="yolov8_Medium.pt") global model model = YOLO(model_file) st.title("Motorbike Violation Detection") input_type = st.selectbox("Select Input Type", ("Image", "Video", "Live Feed")) if input_type == "Image": uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: process_image(uploaded_file) elif input_type == "Video": uploaded_file = st.file_uploader("Choose a video...", type=["mp4", "mov"]) if uploaded_file is not None: output_path = process_video_and_save(uploaded_file) st.video(output_path) elif input_type == "Live Feed": live_video_feed() if __name__ == "__main__": main()