Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import numpy as np | |
import tempfile | |
import time | |
from ultralytics import YOLO | |
from huggingface_hub import hf_hub_download | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.base import MIMEBase | |
from email import encoders | |
import os | |
import smtplib | |
from transformers import AutoModel, AutoProcessor | |
from PIL import Image, ImageDraw, ImageFont | |
import re | |
import torch | |
# Email credentials | |
FROM_EMAIL = "[email protected]" | |
EMAIL_PASSWORD = "cawxqifzqiwjufde" # App-Specific Password | |
TO_EMAIL = "[email protected]" | |
SMTP_SERVER = 'smtp.gmail.com' | |
SMTP_PORT = 465 | |
# Arabic dictionary for converting license plate text | |
arabic_dict = { | |
"0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥", | |
"6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب", | |
"J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط", | |
"E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن", | |
"H": "ه", "U": "و", "V": "ي", " ": " " | |
} | |
# Color mapping for different classes | |
class_colors = { | |
0: (0, 255, 0), # Green (Helmet) | |
1: (255, 0, 0), # Blue (License Plate) | |
2: (0, 0, 255), # Red (MotorbikeDelivery) | |
3: (255, 255, 0), # Cyan (MotorbikeSport) | |
4: (255, 0, 255), # Magenta (No Helmet) | |
5: (0, 255, 255), # Yellow (Person) | |
} | |
# Load the OCR model | |
processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True) | |
model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda') | |
# YOLO inference function | |
def run_yolo(image): | |
results = model(image) | |
return results | |
# Function to process YOLO results and draw bounding boxes | |
def process_results(results, image): | |
boxes = results[0].boxes | |
for box in boxes: | |
x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
conf = box.conf[0] | |
cls = int(box.cls[0]) | |
label = model.names[cls] | |
color = class_colors.get(cls, (255, 255, 255)) | |
# Draw rectangle and label | |
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) | |
cv2.putText(image, f"{label} {conf:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
return image | |
# Process uploaded images | |
def process_image(uploaded_file): | |
image = np.array(cv2.imdecode(np.frombuffer(uploaded_file.read(), np.uint8), 1)) | |
results = run_yolo(image) | |
processed_image = process_results(results, image) | |
processed_image_rgb = cv2.cvtColor(processed_image, cv2.COLOR_BGR2RGB) | |
st.image(processed_image_rgb, caption='Detected Image', use_column_width=True) | |
# Process and save uploaded videos | |
def process_video_and_save(uploaded_file): | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file: | |
temp_file.write(uploaded_file.read()) | |
temp_file_path = temp_file.name | |
video = cv2.VideoCapture(temp_file_path) | |
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
frames = [] | |
current_frame = 0 | |
start_time = time.time() | |
progress_bar = st.progress(0) | |
progress_text = st.empty() | |
while True: | |
ret, frame = video.read() | |
if not ret: | |
break | |
results = run_yolo(frame) | |
processed_frame = process_results(results, frame) | |
processed_frame_rgb = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB) | |
frames.append(processed_frame_rgb) | |
current_frame += 1 | |
progress_percentage = int((current_frame / total_frames) * 100) | |
progress_bar.progress(progress_percentage) | |
progress_text.text(f"Processing frame {current_frame}/{total_frames} ({progress_percentage}%)") | |
video.release() | |
output_path = 'processed_video.mp4' | |
height, width, _ = frames[0].shape | |
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), 30, (width, height)) | |
for frame in frames: | |
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
out.write(frame_bgr) | |
out.release() | |
return output_path | |
# Live video feed processing | |
def live_video_feed(): | |
stframe = st.empty() | |
video = cv2.VideoCapture(0) | |
start_time = time.time() | |
while True: | |
ret, frame = video.read() | |
if not ret: | |
break | |
results = run_yolo(frame) | |
processed_frame = process_results(results, frame) | |
processed_frame_rgb = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB) | |
stframe.image(processed_frame_rgb, channels="RGB", use_column_width=True) | |
elapsed_time = time.time() - start_time | |
st.write(f"Elapsed Time: {elapsed_time:.2f} seconds") | |
if st.button("Stop"): | |
break | |
video.release() | |
st.stop() | |
# Function to filter license plate text | |
def filter_license_plate_text(license_plate_text): | |
license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text) | |
match = re.search(r'(\d{3,4})\s*([A-Z]{2})', license_plate_text) | |
return f"{match.group(1)} {match.group(2)}" if match else None | |
# Function to convert license plate text to Arabic | |
def convert_to_arabic(license_plate_text): | |
return "".join(arabic_dict.get(char, char) for char in license_plate_text) | |
# Function to send email notification with image attachment | |
def send_email(license_text, violation_image_path, violation_type): | |
if violation_type == 'no_helmet': | |
subject = 'تنبيه مخالفة: عدم ارتداء خوذة' | |
body = f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." | |
elif violation_type == 'in_red_lane': | |
subject = 'تنبيه مخالفة: دخول المسار الأيسر' | |
body = f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." | |
elif violation_type == 'no_helmet_in_red_lane': | |
subject = 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر' | |
body = f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة." | |
msg = MIMEMultipart() | |
msg['From'] = FROM_EMAIL | |
msg['To'] = TO_EMAIL | |
msg['Subject'] = subject | |
msg.attach(MIMEText(body, 'plain')) | |
if os.path.exists(violation_image_path): | |
with open(violation_image_path, 'rb') as attachment_file: | |
part = MIMEBase('application', 'octet-stream') | |
part.set_payload(attachment_file.read()) | |
encoders.encode_base64(part) | |
part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}') | |
msg.attach(part) | |
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server: | |
server.login(FROM_EMAIL, EMAIL_PASSWORD) | |
server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string()) | |
print("Email with attachment sent successfully!") | |
# Streamlit app main function | |
def main(): | |
model_file = hf_hub_download(repo_id="TheKnight115/Yolov8m", filename="yolov8_Medium.pt") | |
global model | |
model = YOLO(model_file) | |
st.title("Motorbike Violation Detection") | |
input_type = st.selectbox("Select Input Type", ("Image", "Video", "Live Feed")) | |
if input_type == "Image": | |
uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) | |
if uploaded_file is not None: | |
process_image(uploaded_file) | |
elif input_type == "Video": | |
uploaded_file = st.file_uploader("Choose a video...", type=["mp4", "mov"]) | |
if uploaded_file is not None: | |
output_path = process_video_and_save(uploaded_file) | |
st.video(output_path) | |
elif input_type == "Live Feed": | |
live_video_feed() | |
if __name__ == "__main__": | |
main() | |