import streamlit as st from processor import process_frame, process_image, process_video import os from PIL import Image import tempfile import cv2 from streamlit_webrtc import VideoTransformerBase, webrtc_streamer, WebRtcMode, ClientSettings import av import asyncio # Configure Streamlit page st.set_page_config(page_title="🚦 Traffic Violation Detection", layout="wide") st.title("🚦 Traffic Violation Detection App") # Sidebar for selection st.sidebar.title("Choose an Option") option = st.sidebar.radio("Select the processing type:", ("Image", "Video", "Live Camera")) if option == "Image": st.header("🖼️ Image Processing") uploaded_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: # Save the uploaded image to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_image: temp_image.write(uploaded_file.read()) temp_image_path = temp_image.name # Display the uploaded image st.image(uploaded_file, caption='Uploaded Image.', use_column_width=True) # Process the image if st.button("Process Image"): with st.spinner("Processing..."): font_path = "fonts/alfont_com_arial-1.ttf" # Update the path as needed processed_image = process_image(temp_image_path, font_path) if processed_image is not None: # Convert the processed image to RGB processed_image_rgb = cv2.cvtColor(processed_image, cv2.COLOR_BGR2RGB) st.image(processed_image_rgb, caption='Processed Image.', use_column_width=True) # Save processed image to a temporary file result_image = Image.fromarray(processed_image_rgb) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: result_image.save(tmp.name) tmp_path = tmp.name # Download button with open(tmp_path, "rb") as file: btn = st.download_button( label="📥 Download Processed Image", data=file, file_name="processed_image.jpg", mime="image/jpeg" ) else: st.error("Failed to process the image.") elif option == "Video": st.header("🎥 Video Processing") video_files = [f for f in os.listdir("videos") if f.endswith(('.mp4', '.avi', '.mov'))] if not video_files: st.warning("No predefined videos found in the 'videos/' directory.") else: selected_video = st.selectbox("Select a video to process:", video_files) video_path = os.path.join("videos", selected_video) st.video(video_path) if st.button("Process Video"): with st.spinner("Processing..."): font_path = "fonts/alfont_com_arial-1.ttf" # Update the path as needed processed_video_path = process_video(video_path, font_path) if processed_video_path and os.path.exists(processed_video_path): st.success("Video processed successfully!") st.video(processed_video_path) # Provide download button with open(processed_video_path, "rb") as file: btn = st.download_button( label="📥 Download Processed Video", data=file, file_name="processed_video.mp4", mime="video/mp4" ) else: st.error("Failed to process the video.") elif option == "Live Camera": st.header("📷 Live Camera Processing") st.warning("Live camera processing is in progress. Please allow camera access.") # Define settings for WebRTC WEBRTC_CLIENT_SETTINGS = ClientSettings( rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}, media_stream_constraints={"video": True, "audio": False}, ) class VideoTransformer(VideoTransformerBase): def __init__(self): self.font_path = "fonts/alfont_com_arial-1.ttf" # Update the path as needed def transform(self, frame): img = frame.to_ndarray(format="bgr24") processed_img = process_frame(img, self.font_path) if processed_img is not None: return processed_img return img webrtc_ctx = webrtc_streamer( key="live-camera", mode=WebRtcMode.SENDRECV, client_settings=WEBRTC_CLIENT_SETTINGS, video_transformer_factory=VideoTransformer, async_transform=True, ) st.info("Live camera feed is being processed. Detected violations will be annotated on the video stream.")