Spaces:
Running
Running
TheKnight115
commited on
Create processor.py
Browse files- processor.py +294 -0
processor.py
ADDED
@@ -0,0 +1,294 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
# processor.py
|
2 |
+
|
3 |
+
import cv2
|
4 |
+
import numpy as np
|
5 |
+
import smtplib
|
6 |
+
from email.mime.text import MIMEText
|
7 |
+
from email.mime.multipart import MIMEMultipart
|
8 |
+
from email.mime.base import MIMEBase
|
9 |
+
from email import encoders
|
10 |
+
import os
|
11 |
+
from ultralytics import YOLO
|
12 |
+
from transformers import AutoModel, AutoProcessor
|
13 |
+
from PIL import Image, ImageDraw, ImageFont
|
14 |
+
import re
|
15 |
+
import torch
|
16 |
+
|
17 |
+
# Email credentials (replace with your App-Specific Password)
|
18 |
+
FROM_EMAIL = "[email protected]"
|
19 |
+
EMAIL_PASSWORD = "cawxqifzqiwjufde" # Use App-Specific Password here
|
20 |
+
TO_EMAIL = "[email protected]"
|
21 |
+
SMTP_SERVER = 'smtp.gmail.com'
|
22 |
+
SMTP_PORT = 465
|
23 |
+
|
24 |
+
# Arabic dictionary for converting license plate text
|
25 |
+
arabic_dict = {
|
26 |
+
"0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥",
|
27 |
+
"6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب",
|
28 |
+
"J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط",
|
29 |
+
"E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن",
|
30 |
+
"H": "ه", "U": "و", "V": "ي", " ": " "
|
31 |
+
}
|
32 |
+
class_colors = {
|
33 |
+
0: (0, 255, 0), # Green (Helmet)
|
34 |
+
1: (255, 0, 0), # Blue (License Plate)
|
35 |
+
2: (0, 0, 255), # Red (MotorbikeDelivery)
|
36 |
+
3: (255, 255, 0), # Cyan (MotorbikeSport)
|
37 |
+
4: (255, 0, 255), # Magenta (No Helmet)
|
38 |
+
5: (0, 255, 255), # Yellow (Person)
|
39 |
+
}
|
40 |
+
|
41 |
+
# Load the OCR model
|
42 |
+
processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True)
|
43 |
+
model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda')
|
44 |
+
|
45 |
+
# Load YOLO model
|
46 |
+
model = YOLO('yolov8_Medium.pt') # Update the path as needed
|
47 |
+
|
48 |
+
# Define lane area coordinates (example coordinates)
|
49 |
+
red_lane = np.array([[2,1583],[1,1131],[1828,1141],[1912,1580]], np.int32)
|
50 |
+
|
51 |
+
# Dictionary to track violations per license plate
|
52 |
+
violations_dict = {}
|
53 |
+
|
54 |
+
def filter_license_plate_text(license_plate_text):
|
55 |
+
license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text)
|
56 |
+
match = re.search(r'(\d{4})\s*([A-Z]{2})', license_plate_text)
|
57 |
+
return f"{match.group(1)} {match.group(2)}" if match else None
|
58 |
+
|
59 |
+
def convert_to_arabic(license_plate_text):
|
60 |
+
return "".join(arabic_dict.get(char, char) for char in license_plate_text)
|
61 |
+
|
62 |
+
def send_email(license_text, violation_image_path, violation_type):
|
63 |
+
# Define the subject and body based on violation type
|
64 |
+
if violation_type == 'No Helmet, In Red Lane':
|
65 |
+
subject = 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر'
|
66 |
+
body = f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
|
67 |
+
|
68 |
+
elif violation_type == 'In Red Lane':
|
69 |
+
subject = 'تنبيه مخالفة: دخول المسار الأيسر'
|
70 |
+
body = f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
|
71 |
+
else:
|
72 |
+
violation_type == 'No Helmet'
|
73 |
+
subject = 'تنبيه مخالفة: عدم ارتداء خوذة'
|
74 |
+
body = f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
|
75 |
+
|
76 |
+
# Create the email message
|
77 |
+
msg = MIMEMultipart()
|
78 |
+
msg['From'] = FROM_EMAIL
|
79 |
+
msg['To'] = TO_EMAIL
|
80 |
+
msg['Subject'] = subject
|
81 |
+
msg.attach(MIMEText(body, 'plain'))
|
82 |
+
|
83 |
+
# Attach the violation image
|
84 |
+
if os.path.exists(violation_image_path):
|
85 |
+
with open(violation_image_path, 'rb') as attachment_file:
|
86 |
+
part = MIMEBase('application', 'octet-stream')
|
87 |
+
part.set_payload(attachment_file.read())
|
88 |
+
encoders.encode_base64(part)
|
89 |
+
part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}')
|
90 |
+
msg.attach(part)
|
91 |
+
|
92 |
+
# Send the email using SMTP
|
93 |
+
try:
|
94 |
+
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
|
95 |
+
server.login(FROM_EMAIL, EMAIL_PASSWORD)
|
96 |
+
server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
|
97 |
+
print("Email with attachment sent successfully!")
|
98 |
+
except Exception as e:
|
99 |
+
print(f"Failed to send email: {e}")
|
100 |
+
|
101 |
+
def draw_text_pil(img, text, position, font_path, font_size, color):
|
102 |
+
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
|
103 |
+
|
104 |
+
draw = ImageDraw.Draw(img_pil)
|
105 |
+
|
106 |
+
try:
|
107 |
+
font = ImageFont.truetype(font_path, size=font_size)
|
108 |
+
except IOError:
|
109 |
+
print(f"Font file not found at {font_path}. Using default font.")
|
110 |
+
font = ImageFont.load_default()
|
111 |
+
|
112 |
+
draw.text(position, text, font=font, fill=color)
|
113 |
+
|
114 |
+
img_np = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
|
115 |
+
return img_np
|
116 |
+
|
117 |
+
def process_video(video_path, font_path, violation_image_path='violation.jpg'):
|
118 |
+
cap = cv2.VideoCapture(video_path)
|
119 |
+
|
120 |
+
if not cap.isOpened():
|
121 |
+
print("Error opening video file")
|
122 |
+
return None
|
123 |
+
|
124 |
+
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
125 |
+
output_video_path = 'output_violation.mp4'
|
126 |
+
fps = cap.get(cv2.CAP_PROP_FPS)
|
127 |
+
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
128 |
+
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
129 |
+
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
|
130 |
+
|
131 |
+
margin_y = 50
|
132 |
+
|
133 |
+
while cap.isOpened():
|
134 |
+
ret, frame = cap.read()
|
135 |
+
if not ret:
|
136 |
+
break
|
137 |
+
|
138 |
+
cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3)
|
139 |
+
|
140 |
+
results = model.track(frame)
|
141 |
+
|
142 |
+
for box in results[0].boxes:
|
143 |
+
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
|
144 |
+
label = model.names[int(box.cls)]
|
145 |
+
color = class_colors.get(int(box.cls), (255, 255, 255))
|
146 |
+
confidence = box.conf[0].item()
|
147 |
+
|
148 |
+
helmet_violation = False
|
149 |
+
lane_violation = False
|
150 |
+
violation_type = []
|
151 |
+
|
152 |
+
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3)
|
153 |
+
cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10),
|
154 |
+
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
|
155 |
+
|
156 |
+
if label == 'MotorbikeDelivery' and confidence >= 0.4:
|
157 |
+
motorbike_crop = frame[max(0, y1 - margin_y):y2, x1:x2]
|
158 |
+
delivery_center = ((x1 + x2) // 2, y2)
|
159 |
+
in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
|
160 |
+
if in_red_lane >= 0:
|
161 |
+
lane_violation = True
|
162 |
+
violation_type.append("In Red Lane")
|
163 |
+
|
164 |
+
sub_results = model(motorbike_crop)
|
165 |
+
|
166 |
+
for result in sub_results[0].boxes:
|
167 |
+
sub_x1, sub_y1, sub_x2, sub_y2 = map(int, result.xyxy[0].cpu().numpy())
|
168 |
+
sub_label = model.names[int(result.cls)]
|
169 |
+
sub_color = (255, 0, 0)
|
170 |
+
|
171 |
+
cv2.rectangle(motorbike_crop, (sub_x1, sub_y1), (sub_x2, sub_y2), sub_color, 2)
|
172 |
+
cv2.putText(motorbike_crop, sub_label, (sub_x1, sub_y1 - 10),
|
173 |
+
cv2.FONT_HERSHEY_SIMPLEX, 0.6, sub_color, 2)
|
174 |
+
|
175 |
+
if sub_label == 'No_Helmet':
|
176 |
+
helmet_violation = True
|
177 |
+
violation_type.append("No Helmet")
|
178 |
+
continue
|
179 |
+
if sub_label == 'License_plate':
|
180 |
+
license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
|
181 |
+
|
182 |
+
if helmet_violation or lane_violation:
|
183 |
+
cv2.imwrite(violation_image_path, frame)
|
184 |
+
license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
|
185 |
+
temp_image_path = 'license_plate.png'
|
186 |
+
license_plate_pil.save(temp_image_path)
|
187 |
+
# Placeholder for OCR
|
188 |
+
# license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
|
189 |
+
# For demonstration, we'll mock the OCR result
|
190 |
+
license_plate_text = "1234AB"
|
191 |
+
filtered_text = filter_license_plate_text(license_plate_text)
|
192 |
+
|
193 |
+
if filtered_text:
|
194 |
+
if filtered_text not in violations_dict:
|
195 |
+
violations_dict[filtered_text] = violation_type
|
196 |
+
send_email(filtered_text, violation_image_path, ', '.join(violation_type))
|
197 |
+
else:
|
198 |
+
current_violations = set(violations_dict[filtered_text])
|
199 |
+
new_violations = set(violation_type)
|
200 |
+
updated_violations = list(current_violations | new_violations)
|
201 |
+
|
202 |
+
if updated_violations != violations_dict[filtered_text]:
|
203 |
+
violations_dict[filtered_text] = updated_violations
|
204 |
+
send_email(filtered_text, violation_image_path, ', '.join(updated_violations))
|
205 |
+
|
206 |
+
arabic_text = convert_to_arabic(filtered_text)
|
207 |
+
frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, font_size=30, color=(255, 255, 255))
|
208 |
+
frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, font_size=30, color=(0, 255, 0))
|
209 |
+
|
210 |
+
out.write(frame)
|
211 |
+
|
212 |
+
cap.release()
|
213 |
+
out.release()
|
214 |
+
return output_video_path
|
215 |
+
|
216 |
+
def process_image(image_path, font_path, violation_image_path='violation.jpg'):
|
217 |
+
frame = cv2.imread(image_path)
|
218 |
+
if frame is None:
|
219 |
+
print("Error loading image")
|
220 |
+
return None
|
221 |
+
|
222 |
+
cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3)
|
223 |
+
|
224 |
+
results = model.track(frame)
|
225 |
+
|
226 |
+
for box in results[0].boxes:
|
227 |
+
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
|
228 |
+
label = model.names[int(box.cls)]
|
229 |
+
color = class_colors.get(int(box.cls), (255, 255, 255))
|
230 |
+
confidence = box.conf[0].item()
|
231 |
+
|
232 |
+
helmet_violation = False
|
233 |
+
lane_violation = False
|
234 |
+
violation_type = []
|
235 |
+
|
236 |
+
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3)
|
237 |
+
cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10),
|
238 |
+
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
|
239 |
+
|
240 |
+
if label == 'MotorbikeDelivery' and confidence >= 0.4:
|
241 |
+
motorbike_crop = frame[max(0, y1 - 50):y2, x1:x2]
|
242 |
+
delivery_center = ((x1 + x2) // 2, y2)
|
243 |
+
in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
|
244 |
+
if in_red_lane >= 0:
|
245 |
+
lane_violation = True
|
246 |
+
violation_type.append("In Red Lane")
|
247 |
+
|
248 |
+
sub_results = model(motorbike_crop)
|
249 |
+
|
250 |
+
for result in sub_results[0].boxes:
|
251 |
+
sub_x1, sub_y1, sub_x2, sub_y2 = map(int, result.xyxy[0].cpu().numpy())
|
252 |
+
sub_label = model.names[int(result.cls)]
|
253 |
+
sub_color = (255, 0, 0)
|
254 |
+
|
255 |
+
cv2.rectangle(motorbike_crop, (sub_x1, sub_y1), (sub_x2, sub_y2), sub_color, 2)
|
256 |
+
cv2.putText(motorbike_crop, sub_label, (sub_x1, sub_y1 - 10),
|
257 |
+
cv2.FONT_HERSHEY_SIMPLEX, 0.6, sub_color, 2)
|
258 |
+
|
259 |
+
if sub_label == 'No_Helmet':
|
260 |
+
helmet_violation = True
|
261 |
+
violation_type.append("No Helmet")
|
262 |
+
continue
|
263 |
+
if sub_label == 'License_plate':
|
264 |
+
license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
|
265 |
+
|
266 |
+
if helmet_violation or lane_violation:
|
267 |
+
cv2.imwrite(violation_image_path, frame)
|
268 |
+
license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
|
269 |
+
temp_image_path = 'license_plate.png'
|
270 |
+
license_plate_pil.save(temp_image_path)
|
271 |
+
# Placeholder for OCR
|
272 |
+
# license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
|
273 |
+
# For demonstration, we'll mock the OCR result
|
274 |
+
license_plate_text = "1234AB"
|
275 |
+
filtered_text = filter_license_plate_text(license_plate_text)
|
276 |
+
|
277 |
+
if filtered_text:
|
278 |
+
if filtered_text not in violations_dict:
|
279 |
+
violations_dict[filtered_text] = violation_type
|
280 |
+
send_email(filtered_text, violation_image_path, ', '.join(violation_type))
|
281 |
+
else:
|
282 |
+
current_violations = set(violations_dict[filtered_text])
|
283 |
+
new_violations = set(violation_type)
|
284 |
+
updated_violations = list(current_violations | new_violations)
|
285 |
+
|
286 |
+
if updated_violations != violations_dict[filtered_text]:
|
287 |
+
violations_dict[filtered_text] = updated_violations
|
288 |
+
send_email(filtered_text, violation_image_path, ', '.join(updated_violations))
|
289 |
+
|
290 |
+
arabic_text = convert_to_arabic(filtered_text)
|
291 |
+
frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, font_size=30, color=(255, 255, 255))
|
292 |
+
frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, font_size=30, color=(0, 255, 0))
|
293 |
+
|
294 |
+
return frame
|