TheKnight115 commited on
Commit
1289698
·
verified ·
1 Parent(s): 1cf4588

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +57 -198
processor.py CHANGED
@@ -2,26 +2,25 @@
2
 
3
  import cv2
4
  import numpy as np
5
- import smtplib
6
- from email.mime.text import MIMEText
7
- from email.mime.multipart import MIMEMultipart
8
- from email.mime.base import MIMEBase
9
- from email import encoders
10
  import os
11
  from ultralytics import YOLO
12
  from transformers import AutoModel, AutoProcessor
13
  from PIL import Image, ImageDraw, ImageFont
14
  import re
15
- import torch
 
 
 
 
16
 
17
- # Email credentials (replace with your App-Specific Password)
18
- FROM_EMAIL = "Fares5675@gmail.com"
19
- EMAIL_PASSWORD = "cawxqifzqiwjufde" # Use App-Specific Password here
20
- TO_EMAIL = "kduwaysan@gmail.com"
21
  SMTP_SERVER = 'smtp.gmail.com'
22
  SMTP_PORT = 465
23
 
24
- # Arabic dictionary for converting license plate text
25
  arabic_dict = {
26
  "0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥",
27
  "6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب",
@@ -38,49 +37,44 @@ class_colors = {
38
  5: (0, 255, 255), # Yellow (Person)
39
  }
40
 
41
- # Load the OCR model
42
  processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True)
43
  model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda')
 
44
 
45
- # Load YOLO model
46
- model = YOLO('yolov8_Medium.pt') # Update the path as needed
47
-
48
- # Define lane area coordinates (example coordinates)
49
  red_lane = np.array([[2,1583],[1,1131],[1828,1141],[1912,1580]], np.int32)
50
 
51
- # Dictionary to track violations per license plate
52
  violations_dict = {}
53
 
54
- def filter_license_plate_text(license_plate_text):
55
- license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text)
56
- match = re.search(r'(\d{4})\s*([A-Z]{2})', license_plate_text)
57
  return f"{match.group(1)} {match.group(2)}" if match else None
58
 
59
- def convert_to_arabic(license_plate_text):
60
- return "".join(arabic_dict.get(char, char) for char in license_plate_text)
 
 
 
 
 
 
 
 
 
 
 
 
 
61
 
62
- def send_email(license_text, violation_image_path, violation_type):
63
- # Define the subject and body based on violation type
64
- if violation_type == 'No Helmet, In Red Lane':
65
- subject = 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر'
66
- body = f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
67
-
68
- elif violation_type == 'In Red Lane':
69
- subject = 'تنبيه مخالفة: دخول المسار الأيسر'
70
- body = f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
71
- else:
72
- violation_type == 'No Helmet'
73
- subject = 'تنبيه مخالفة: عدم ارتداء خوذة'
74
- body = f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
75
-
76
- # Create the email message
77
  msg = MIMEMultipart()
78
  msg['From'] = FROM_EMAIL
79
  msg['To'] = TO_EMAIL
80
- msg['Subject'] = subject
81
  msg.attach(MIMEText(body, 'plain'))
82
 
83
- # Attach the violation image
84
  if os.path.exists(violation_image_path):
85
  with open(violation_image_path, 'rb') as attachment_file:
86
  part = MIMEBase('application', 'octet-stream')
@@ -89,200 +83,65 @@ def send_email(license_text, violation_image_path, violation_type):
89
  part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}')
90
  msg.attach(part)
91
 
92
- # Send the email using SMTP
93
  try:
94
  with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
95
  server.login(FROM_EMAIL, EMAIL_PASSWORD)
96
  server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
97
- print("Email with attachment sent successfully!")
98
  except Exception as e:
99
  print(f"Failed to send email: {e}")
100
 
101
  def draw_text_pil(img, text, position, font_path, font_size, color):
102
  img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
103
-
104
  draw = ImageDraw.Draw(img_pil)
105
-
106
  try:
107
  font = ImageFont.truetype(font_path, size=font_size)
108
  except IOError:
109
- print(f"Font file not found at {font_path}. Using default font.")
110
  font = ImageFont.load_default()
111
-
112
  draw.text(position, text, font=font, fill=color)
113
-
114
- img_np = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
115
- return img_np
116
-
117
- def process_video(video_path, font_path, violation_image_path='violation.jpg'):
118
- cap = cv2.VideoCapture(video_path)
119
-
120
- if not cap.isOpened():
121
- print("Error opening video file")
122
- return None
123
-
124
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
125
- output_video_path = 'output_violation.mp4'
126
- fps = cap.get(cv2.CAP_PROP_FPS)
127
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
128
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
129
- out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
130
-
131
- margin_y = 50
132
-
133
- while cap.isOpened():
134
- ret, frame = cap.read()
135
- if not ret:
136
- break
137
-
138
- cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3)
139
-
140
- results = model.track(frame)
141
-
142
- for box in results[0].boxes:
143
- x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
144
- label = model.names[int(box.cls)]
145
- color = class_colors.get(int(box.cls), (255, 255, 255))
146
- confidence = box.conf[0].item()
147
-
148
- helmet_violation = False
149
- lane_violation = False
150
- violation_type = []
151
-
152
- cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3)
153
- cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10),
154
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
155
-
156
- if label == 'MotorbikeDelivery' and confidence >= 0.4:
157
- motorbike_crop = frame[max(0, y1 - margin_y):y2, x1:x2]
158
- delivery_center = ((x1 + x2) // 2, y2)
159
- in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
160
- if in_red_lane >= 0:
161
- lane_violation = True
162
- violation_type.append("In Red Lane")
163
-
164
- sub_results = model(motorbike_crop)
165
-
166
- for result in sub_results[0].boxes:
167
- sub_x1, sub_y1, sub_x2, sub_y2 = map(int, result.xyxy[0].cpu().numpy())
168
- sub_label = model.names[int(result.cls)]
169
- sub_color = (255, 0, 0)
170
-
171
- cv2.rectangle(motorbike_crop, (sub_x1, sub_y1), (sub_x2, sub_y2), sub_color, 2)
172
- cv2.putText(motorbike_crop, sub_label, (sub_x1, sub_y1 - 10),
173
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, sub_color, 2)
174
-
175
- if sub_label == 'No_Helmet':
176
- helmet_violation = True
177
- violation_type.append("No Helmet")
178
- continue
179
- if sub_label == 'License_plate':
180
- license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
181
-
182
- if helmet_violation or lane_violation:
183
- cv2.imwrite(violation_image_path, frame)
184
- license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
185
- temp_image_path = 'license_plate.png'
186
- license_plate_pil.save(temp_image_path)
187
- license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
188
- filtered_text = filter_license_plate_text(license_plate_text)
189
-
190
- if filtered_text:
191
- if filtered_text not in violations_dict:
192
- violations_dict[filtered_text] = violation_type
193
- send_email(filtered_text, violation_image_path, ', '.join(violation_type))
194
- else:
195
- current_violations = set(violations_dict[filtered_text])
196
- new_violations = set(violation_type)
197
- updated_violations = list(current_violations | new_violations)
198
-
199
- if updated_violations != violations_dict[filtered_text]:
200
- violations_dict[filtered_text] = updated_violations
201
- send_email(filtered_text, violation_image_path, ', '.join(updated_violations))
202
-
203
- arabic_text = convert_to_arabic(filtered_text)
204
- frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, font_size=30, color=(255, 255, 255))
205
- frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, font_size=30, color=(0, 255, 0))
206
-
207
- out.write(frame)
208
-
209
- cap.release()
210
- out.release()
211
- return output_video_path
212
-
213
- def process_image(image_path, font_path, violation_image_path='violation.jpg'):
214
- frame = cv2.imread(image_path)
215
- if frame is None:
216
- print("Error loading image")
217
- return None
218
-
219
- cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3)
220
 
 
221
  results = model.track(frame)
222
-
223
  for box in results[0].boxes:
224
- x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
225
- label = model.names[int(box.cls)]
226
  color = class_colors.get(int(box.cls), (255, 255, 255))
227
  confidence = box.conf[0].item()
228
 
229
- helmet_violation = False
230
- lane_violation = False
231
- violation_type = []
232
-
233
- cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3)
234
- cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10),
235
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
236
-
237
  if label == 'MotorbikeDelivery' and confidence >= 0.4:
238
  motorbike_crop = frame[max(0, y1 - 50):y2, x1:x2]
239
  delivery_center = ((x1 + x2) // 2, y2)
240
  in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
 
241
  if in_red_lane >= 0:
242
- lane_violation = True
243
- violation_type.append("In Red Lane")
244
 
245
  sub_results = model(motorbike_crop)
246
-
247
- for result in sub_results[0].boxes:
248
- sub_x1, sub_y1, sub_x2, sub_y2 = map(int, result.xyxy[0].cpu().numpy())
249
- sub_label = model.names[int(result.cls)]
250
- sub_color = (255, 0, 0)
251
-
252
- cv2.rectangle(motorbike_crop, (sub_x1, sub_y1), (sub_x2, sub_y2), sub_color, 2)
253
- cv2.putText(motorbike_crop, sub_label, (sub_x1, sub_y1 - 10),
254
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, sub_color, 2)
255
-
256
  if sub_label == 'No_Helmet':
257
- helmet_violation = True
258
- violation_type.append("No Helmet")
259
- continue
260
- if sub_label == 'License_plate':
261
  license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
262
-
263
- if helmet_violation or lane_violation:
264
  cv2.imwrite(violation_image_path, frame)
265
  license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
266
- temp_image_path = 'license_plate.png'
267
- license_plate_pil.save(temp_image_path)
268
  license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
269
  filtered_text = filter_license_plate_text(license_plate_text)
270
-
271
  if filtered_text:
272
  if filtered_text not in violations_dict:
273
- violations_dict[filtered_text] = violation_type
274
- send_email(filtered_text, violation_image_path, ', '.join(violation_type))
275
  else:
276
- current_violations = set(violations_dict[filtered_text])
277
- new_violations = set(violation_type)
278
- updated_violations = list(current_violations | new_violations)
279
-
280
- if updated_violations != violations_dict[filtered_text]:
281
- violations_dict[filtered_text] = updated_violations
282
- send_email(filtered_text, violation_image_path, ', '.join(updated_violations))
283
-
284
  arabic_text = convert_to_arabic(filtered_text)
285
- frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, font_size=30, color=(255, 255, 255))
286
- frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, font_size=30, color=(0, 255, 0))
287
-
288
- return frame
 
2
 
3
  import cv2
4
  import numpy as np
 
 
 
 
 
5
  import os
6
  from ultralytics import YOLO
7
  from transformers import AutoModel, AutoProcessor
8
  from PIL import Image, ImageDraw, ImageFont
9
  import re
10
+ import smtplib
11
+ from email.mime.text import MIMEText
12
+ from email.mime.multipart import MIMEMultipart
13
+ from email.mime.base import MIMEBase
14
+ from email import encoders
15
 
16
+ # Email credentials (Use environment variables for security)
17
+ FROM_EMAIL = os.getenv("FROM_EMAIL")
18
+ EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD")
19
+ TO_EMAIL = os.getenv("TO_EMAIL")
20
  SMTP_SERVER = 'smtp.gmail.com'
21
  SMTP_PORT = 465
22
 
23
+ # Arabic dictionary
24
  arabic_dict = {
25
  "0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥",
26
  "6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب",
 
37
  5: (0, 255, 255), # Yellow (Person)
38
  }
39
 
40
+ # Load models
41
  processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True)
42
  model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda')
43
+ model = YOLO('yolov8_Medium.pt') # Update path as needed
44
 
45
+ # Define lane area
 
 
 
46
  red_lane = np.array([[2,1583],[1,1131],[1828,1141],[1912,1580]], np.int32)
47
 
48
+ # Violation tracking
49
  violations_dict = {}
50
 
51
+ def filter_license_plate_text(text):
52
+ text = re.sub(r'[^A-Z0-9]+', "", text)
53
+ match = re.search(r'(\d{4})\s*([A-Z]{2})', text)
54
  return f"{match.group(1)} {match.group(2)}" if match else None
55
 
56
+ def convert_to_arabic(text):
57
+ return "".join(arabic_dict.get(char, char) for char in text)
58
+
59
+ def send_email(license_text, violation_image_path, violation_type):
60
+ subject = {
61
+ 'No Helmet, In Red Lane': 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر',
62
+ 'In Red Lane': 'تنبيه مخالفة: دخول المسار الأيسر',
63
+ 'No Helmet': 'تنبيه مخالفة: عدم ارتداء خوذة'
64
+ }.get(violation_type, 'تنبيه مخالفة')
65
+
66
+ body = {
67
+ 'No Helmet, In Red Lane': f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة",
68
+ 'In Red Lane': f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة",
69
+ 'No Helmet': f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة"
70
+ }.get(violation_type, f"تم تغريم دراجة نارية التي تحمل لوحة ({license_text})")
71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  msg = MIMEMultipart()
73
  msg['From'] = FROM_EMAIL
74
  msg['To'] = TO_EMAIL
75
+ msg['Subject'] = subject
76
  msg.attach(MIMEText(body, 'plain'))
77
 
 
78
  if os.path.exists(violation_image_path):
79
  with open(violation_image_path, 'rb') as attachment_file:
80
  part = MIMEBase('application', 'octet-stream')
 
83
  part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}')
84
  msg.attach(part)
85
 
 
86
  try:
87
  with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
88
  server.login(FROM_EMAIL, EMAIL_PASSWORD)
89
  server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
 
90
  except Exception as e:
91
  print(f"Failed to send email: {e}")
92
 
93
  def draw_text_pil(img, text, position, font_path, font_size, color):
94
  img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
 
95
  draw = ImageDraw.Draw(img_pil)
 
96
  try:
97
  font = ImageFont.truetype(font_path, size=font_size)
98
  except IOError:
 
99
  font = ImageFont.load_default()
 
100
  draw.text(position, text, font=font, fill=color)
101
+ return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
 
103
+ def process_frame(frame, font_path, violation_image_path='violation.jpg'):
104
  results = model.track(frame)
 
105
  for box in results[0].boxes:
106
+ x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
107
+ label = model.names[int(box.cls)]
108
  color = class_colors.get(int(box.cls), (255, 255, 255))
109
  confidence = box.conf[0].item()
110
 
 
 
 
 
 
 
 
 
111
  if label == 'MotorbikeDelivery' and confidence >= 0.4:
112
  motorbike_crop = frame[max(0, y1 - 50):y2, x1:x2]
113
  delivery_center = ((x1 + x2) // 2, y2)
114
  in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
115
+ violation_types = []
116
  if in_red_lane >= 0:
117
+ violation_types.append("In Red Lane")
 
118
 
119
  sub_results = model(motorbike_crop)
120
+ for sub_box in sub_results[0].boxes:
121
+ sub_x1, sub_y1, sub_x2, sub_y2 = map(int, sub_box.xyxy[0].cpu().numpy())
122
+ sub_label = model.names[int(sub_box.cls)]
 
 
 
 
 
 
 
123
  if sub_label == 'No_Helmet':
124
+ violation_types.append("No Helmet")
125
+ elif sub_label == 'License_plate':
 
 
126
  license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
127
+ if violation_types:
 
128
  cv2.imwrite(violation_image_path, frame)
129
  license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
130
+ license_plate_pil.save('license_plate.png')
 
131
  license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
132
  filtered_text = filter_license_plate_text(license_plate_text)
 
133
  if filtered_text:
134
  if filtered_text not in violations_dict:
135
+ violations_dict[filtered_text] = violation_types
136
+ send_email(filtered_text, violation_image_path, ', '.join(violation_types))
137
  else:
138
+ current = set(violations_dict[filtered_text])
139
+ new = set(violation_types)
140
+ updated = current | new
141
+ if updated != current:
142
+ violations_dict[filtered_text] = list(updated)
143
+ send_email(filtered_text, violation_image_path, ', '.join(updated))
 
 
144
  arabic_text = convert_to_arabic(filtered_text)
145
+ frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, 30, (255, 255, 255))
146
+ frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, 30, (0, 255, 0))
147
+ return frame